Skip to content

Commit ca55f1c

Browse files
authored
Revert use file schema in parquet pruning (#16086)
* wip * comment * Update datafusion/core/src/datasource/physical_plan/parquet.rs * remove prints * better test * fmt
1 parent 46d3f52 commit ca55f1c

File tree

4 files changed

+120
-17
lines changed

4 files changed

+120
-17
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ mod tests {
3939
use crate::test::object_store::local_unpartitioned_file;
4040
use arrow::array::{
4141
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
42-
StructArray,
42+
StringViewArray, StructArray,
4343
};
4444
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
4545
use arrow::record_batch::RecordBatch;
@@ -100,6 +100,7 @@ mod tests {
100100
predicate: Option<Expr>,
101101
pushdown_predicate: bool,
102102
page_index_predicate: bool,
103+
bloom_filters: bool,
103104
}
104105

105106
impl RoundTrip {
@@ -132,6 +133,11 @@ mod tests {
132133
self
133134
}
134135

136+
fn with_bloom_filters(mut self) -> Self {
137+
self.bloom_filters = true;
138+
self
139+
}
140+
135141
/// run the test, returning only the resulting RecordBatches
136142
async fn round_trip_to_batches(
137143
self,
@@ -156,10 +162,20 @@ mod tests {
156162
source = source
157163
.with_pushdown_filters(true)
158164
.with_reorder_filters(true);
165+
} else {
166+
source = source.with_pushdown_filters(false);
159167
}
160168

161169
if self.page_index_predicate {
162170
source = source.with_enable_page_index(true);
171+
} else {
172+
source = source.with_enable_page_index(false);
173+
}
174+
175+
if self.bloom_filters {
176+
source = source.with_bloom_filter_on_read(true);
177+
} else {
178+
source = source.with_bloom_filter_on_read(false);
163179
}
164180

165181
source.with_schema(Arc::clone(&file_schema))
@@ -817,7 +833,7 @@ mod tests {
817833
}
818834

819835
#[tokio::test]
820-
async fn evolved_schema_filter() {
836+
async fn evolved_schema_column_order_filter() {
821837
let c1: ArrayRef =
822838
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
823839

@@ -848,6 +864,88 @@ mod tests {
848864
assert_eq!(read.len(), 0);
849865
}
850866

867+
#[tokio::test]
868+
async fn evolved_schema_column_type_filter_strings() {
869+
// The table and filter have a common data type, but the file schema differs
870+
let c1: ArrayRef =
871+
Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
872+
let batch = create_batch(vec![("c1", c1.clone())]);
873+
874+
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));
875+
876+
// Predicate should prune all row groups
877+
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
878+
let rt = RoundTrip::new()
879+
.with_predicate(filter)
880+
.with_schema(schema.clone())
881+
.round_trip(vec![batch.clone()])
882+
.await;
883+
// There should be no predicate evaluation errors
884+
let metrics = rt.parquet_exec.metrics().unwrap();
885+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
886+
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
887+
assert_eq!(rt.batches.unwrap().len(), 0);
888+
889+
// Predicate should prune no row groups
890+
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
891+
let rt = RoundTrip::new()
892+
.with_predicate(filter)
893+
.with_schema(schema)
894+
.round_trip(vec![batch])
895+
.await;
896+
// There should be no predicate evaluation errors
897+
let metrics = rt.parquet_exec.metrics().unwrap();
898+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
899+
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
900+
let read = rt
901+
.batches
902+
.unwrap()
903+
.iter()
904+
.map(|b| b.num_rows())
905+
.sum::<usize>();
906+
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
907+
}
908+
909+
#[tokio::test]
910+
async fn evolved_schema_column_type_filter_ints() {
911+
// The table and filter have a common data type, but the file schema differs
912+
let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
913+
let batch = create_batch(vec![("c1", c1.clone())]);
914+
915+
let schema =
916+
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));
917+
918+
// Predicate should prune all row groups
919+
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
920+
let rt = RoundTrip::new()
921+
.with_predicate(filter)
922+
.with_schema(schema.clone())
923+
.round_trip(vec![batch.clone()])
924+
.await;
925+
// There should be no predicate evaluation errors
926+
let metrics = rt.parquet_exec.metrics().unwrap();
927+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
928+
assert_eq!(rt.batches.unwrap().len(), 0);
929+
930+
// Predicate should prune no row groups
931+
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
932+
let rt = RoundTrip::new()
933+
.with_predicate(filter)
934+
.with_schema(schema)
935+
.round_trip(vec![batch])
936+
.await;
937+
// There should be no predicate evaluation errors
938+
let metrics = rt.parquet_exec.metrics().unwrap();
939+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
940+
let read = rt
941+
.batches
942+
.unwrap()
943+
.iter()
944+
.map(|b| b.num_rows())
945+
.sum::<usize>();
946+
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
947+
}
948+
851949
#[tokio::test]
852950
async fn evolved_schema_disjoint_schema_filter() {
853951
let c1: ArrayRef =
@@ -1748,6 +1846,7 @@ mod tests {
17481846
let rt = RoundTrip::new()
17491847
.with_predicate(filter.clone())
17501848
.with_pushdown_predicate()
1849+
.with_bloom_filters()
17511850
.round_trip(vec![batch1])
17521851
.await;
17531852

datafusion/datasource-parquet/src/opener.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ pub(super) struct ParquetOpener {
5555
pub limit: Option<usize>,
5656
/// Optional predicate to apply during the scan
5757
pub predicate: Option<Arc<dyn PhysicalExpr>>,
58-
/// Schema of the output table
59-
pub table_schema: SchemaRef,
58+
/// Schema of the output table without partition columns.
59+
/// This is the schema we coerce the physical file schema into.
60+
pub logical_file_schema: SchemaRef,
6061
/// Optional hint for how large the initial request to read parquet metadata
6162
/// should be
6263
pub metadata_size_hint: Option<usize>,
@@ -104,13 +105,13 @@ impl FileOpener for ParquetOpener {
104105
let batch_size = self.batch_size;
105106

106107
let projected_schema =
107-
SchemaRef::from(self.table_schema.project(&self.projection)?);
108+
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
108109
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
109110
let schema_adapter = self
110111
.schema_adapter_factory
111-
.create(projected_schema, Arc::clone(&self.table_schema));
112+
.create(projected_schema, Arc::clone(&self.logical_file_schema));
112113
let predicate = self.predicate.clone();
113-
let table_schema = Arc::clone(&self.table_schema);
114+
let logical_file_schema = Arc::clone(&self.logical_file_schema);
114115
let reorder_predicates = self.reorder_filters;
115116
let pushdown_filters = self.pushdown_filters;
116117
let coerce_int96 = self.coerce_int96;
@@ -141,17 +142,20 @@ impl FileOpener for ParquetOpener {
141142
.await?;
142143

143144
// Note about schemas: we are actually dealing with **3 different schemas** here:
144-
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
145-
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
145+
// - The table schema as defined by the TableProvider.
146+
// This is what the user sees, what they get when they `SELECT * FROM table`, etc.
147+
// - The logical file schema: this is the table schema minus any hive partition columns and projections.
148+
// This is what the physicalfile schema is coerced to.
146149
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
147150
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
148151

149152
// The schema loaded from the file may not be the same as the
150153
// desired schema (for example if we want to instruct the parquet
151154
// reader to read strings using Utf8View instead). Update if necessary
152-
if let Some(merged) =
153-
apply_file_schema_type_coercions(&table_schema, &physical_file_schema)
154-
{
155+
if let Some(merged) = apply_file_schema_type_coercions(
156+
&logical_file_schema,
157+
&physical_file_schema,
158+
) {
155159
physical_file_schema = Arc::new(merged);
156160
options = options.with_schema(Arc::clone(&physical_file_schema));
157161
reader_metadata = ArrowReaderMetadata::try_new(
@@ -178,7 +182,7 @@ impl FileOpener for ParquetOpener {
178182
// Build predicates for this specific file
179183
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
180184
predicate.as_ref(),
181-
&physical_file_schema,
185+
&logical_file_schema,
182186
&predicate_creation_errors,
183187
);
184188

@@ -215,7 +219,7 @@ impl FileOpener for ParquetOpener {
215219
let row_filter = row_filter::build_row_filter(
216220
&predicate,
217221
&physical_file_schema,
218-
&table_schema,
222+
&logical_file_schema,
219223
builder.metadata(),
220224
reorder_predicates,
221225
&file_metrics,

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<boo
426426
pub fn build_row_filter(
427427
expr: &Arc<dyn PhysicalExpr>,
428428
physical_file_schema: &SchemaRef,
429-
table_schema: &SchemaRef,
429+
logical_file_schema: &SchemaRef,
430430
metadata: &ParquetMetaData,
431431
reorder_predicates: bool,
432432
file_metrics: &ParquetFileMetrics,
@@ -447,7 +447,7 @@ pub fn build_row_filter(
447447
FilterCandidateBuilder::new(
448448
Arc::clone(expr),
449449
Arc::clone(physical_file_schema),
450-
Arc::clone(table_schema),
450+
Arc::clone(logical_file_schema),
451451
Arc::clone(schema_adapter_factory),
452452
)
453453
.build(metadata)

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ impl FileSource for ParquetSource {
481481
.expect("Batch size must set before creating ParquetOpener"),
482482
limit: base_config.limit,
483483
predicate: self.predicate.clone(),
484-
table_schema: Arc::clone(&base_config.file_schema),
484+
logical_file_schema: Arc::clone(&base_config.file_schema),
485485
metadata_size_hint: self.metadata_size_hint,
486486
metrics: self.metrics().clone(),
487487
parquet_file_reader_factory,

0 commit comments

Comments
 (0)