Skip to content

Commit 0e48627

Browse files
authored
adapt filter expressions to file schema during parquet scan instead of using a SchemaAdapter (#16461)
1 parent db13dd9 commit 0e48627

File tree

5 files changed

+680
-147
lines changed

5 files changed

+680
-147
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ mod tests {
3939
use crate::test::object_store::local_unpartitioned_file;
4040
use arrow::array::{
4141
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
42-
StringViewArray, StructArray,
42+
StringViewArray, StructArray, TimestampNanosecondArray,
4343
};
4444
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
4545
use arrow::record_batch::RecordBatch;
@@ -960,6 +960,72 @@ mod tests {
960960
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
961961
}
962962

963+
#[tokio::test]
964+
async fn evolved_schema_column_type_filter_timestamp_units() {
965+
// The table and filter have a common data type
966+
// The table schema is in milliseconds, but the file schema is in nanoseconds
967+
let c1: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
968+
Some(1_000_000_000), // 1970-01-01T00:00:01Z
969+
Some(2_000_000_000), // 1970-01-01T00:00:02Z
970+
Some(3_000_000_000), // 1970-01-01T00:00:03Z
971+
Some(4_000_000_000), // 1970-01-01T00:00:04Z
972+
]));
973+
let batch = create_batch(vec![("c1", c1.clone())]);
974+
let table_schema = Arc::new(Schema::new(vec![Field::new(
975+
"c1",
976+
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
977+
false,
978+
)]));
979+
// One row should match, 2 pruned via page index, 1 pruned via filter pushdown
980+
let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
981+
Some(1_000),
982+
Some("UTC".into()),
983+
)));
984+
let rt = RoundTrip::new()
985+
.with_predicate(filter)
986+
.with_pushdown_predicate()
987+
.with_page_index_predicate() // produces pages with 2 rows each (2 pages total for our data)
988+
.with_table_schema(table_schema.clone())
989+
.round_trip(vec![batch.clone()])
990+
.await;
991+
// There should be no predicate evaluation errors and we keep 1 row
992+
let metrics = rt.parquet_exec.metrics().unwrap();
993+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
994+
let read = rt
995+
.batches
996+
.unwrap()
997+
.iter()
998+
.map(|b| b.num_rows())
999+
.sum::<usize>();
1000+
assert_eq!(read, 1, "Expected 1 rows to match the predicate");
1001+
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1002+
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
1003+
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
1004+
// If we filter with a value that is completely out of the range of the data
1005+
// we prune at the row group level.
1006+
let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
1007+
Some(5_000),
1008+
Some("UTC".into()),
1009+
)));
1010+
let rt = RoundTrip::new()
1011+
.with_predicate(filter)
1012+
.with_pushdown_predicate()
1013+
.with_table_schema(table_schema)
1014+
.round_trip(vec![batch])
1015+
.await;
1016+
// There should be no predicate evaluation errors and we keep 0 rows
1017+
let metrics = rt.parquet_exec.metrics().unwrap();
1018+
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
1019+
let read = rt
1020+
.batches
1021+
.unwrap()
1022+
.iter()
1023+
.map(|b| b.num_rows())
1024+
.sum::<usize>();
1025+
assert_eq!(read, 0, "Expected 0 rows to match the predicate");
1026+
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 1);
1027+
}
1028+
9631029
#[tokio::test]
9641030
async fn evolved_schema_disjoint_schema_filter() {
9651031
let c1: ArrayRef =

datafusion/datasource-parquet/src/opener.rs

Lines changed: 131 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use datafusion_common::pruning::{
3737
};
3838
use datafusion_common::{exec_err, Result};
3939
use datafusion_datasource::PartitionedFile;
40+
use datafusion_physical_expr::PhysicalExprSchemaRewriter;
4041
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4142
use datafusion_physical_optimizer::pruning::PruningPredicate;
4243
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
@@ -117,7 +118,6 @@ impl FileOpener for ParquetOpener {
117118

118119
let projected_schema =
119120
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
120-
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
121121
let schema_adapter = self
122122
.schema_adapter_factory
123123
.create(projected_schema, Arc::clone(&self.logical_file_schema));
@@ -159,7 +159,7 @@ impl FileOpener for ParquetOpener {
159159
if let Some(pruning_predicate) = pruning_predicate {
160160
// The partition column schema is the schema of the table - the schema of the file
161161
let mut pruning = Box::new(PartitionPruningStatistics::try_new(
162-
vec![file.partition_values],
162+
vec![file.partition_values.clone()],
163163
partition_fields.clone(),
164164
)?)
165165
as Box<dyn PruningStatistics>;
@@ -248,10 +248,27 @@ impl FileOpener for ParquetOpener {
248248
}
249249
}
250250

251+
// Adapt the predicate to the physical file schema.
252+
// This evaluates missing columns and inserts any necessary casts.
253+
let predicate = predicate
254+
.map(|p| {
255+
PhysicalExprSchemaRewriter::new(
256+
&physical_file_schema,
257+
&logical_file_schema,
258+
)
259+
.with_partition_columns(
260+
partition_fields.to_vec(),
261+
file.partition_values,
262+
)
263+
.rewrite(p)
264+
.map_err(ArrowError::from)
265+
})
266+
.transpose()?;
267+
251268
// Build predicates for this specific file
252269
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
253270
predicate.as_ref(),
254-
&logical_file_schema,
271+
&physical_file_schema,
255272
&predicate_creation_errors,
256273
);
257274

@@ -288,11 +305,9 @@ impl FileOpener for ParquetOpener {
288305
let row_filter = row_filter::build_row_filter(
289306
&predicate,
290307
&physical_file_schema,
291-
&logical_file_schema,
292308
builder.metadata(),
293309
reorder_predicates,
294310
&file_metrics,
295-
&schema_adapter_factory,
296311
);
297312

298313
match row_filter {
@@ -879,4 +894,115 @@ mod test {
879894
assert_eq!(num_batches, 0);
880895
assert_eq!(num_rows, 0);
881896
}
897+
898+
#[tokio::test]
899+
async fn test_prune_on_partition_value_and_data_value() {
900+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
901+
902+
// Note: number 3 is missing!
903+
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(4)])).unwrap();
904+
let data_size =
905+
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
906+
907+
let file_schema = batch.schema();
908+
let mut file = PartitionedFile::new(
909+
"part=1/file.parquet".to_string(),
910+
u64::try_from(data_size).unwrap(),
911+
);
912+
file.partition_values = vec![ScalarValue::Int32(Some(1))];
913+
914+
let table_schema = Arc::new(Schema::new(vec![
915+
Field::new("part", DataType::Int32, false),
916+
Field::new("a", DataType::Int32, false),
917+
]));
918+
919+
let make_opener = |predicate| {
920+
ParquetOpener {
921+
partition_index: 0,
922+
projection: Arc::new([0]),
923+
batch_size: 1024,
924+
limit: None,
925+
predicate: Some(predicate),
926+
logical_file_schema: file_schema.clone(),
927+
metadata_size_hint: None,
928+
metrics: ExecutionPlanMetricsSet::new(),
929+
parquet_file_reader_factory: Arc::new(
930+
DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
931+
),
932+
partition_fields: vec![Arc::new(Field::new(
933+
"part",
934+
DataType::Int32,
935+
false,
936+
))],
937+
pushdown_filters: true, // note that this is true!
938+
reorder_filters: true,
939+
enable_page_index: false,
940+
enable_bloom_filter: false,
941+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
942+
enable_row_group_stats_pruning: false, // note that this is false!
943+
coerce_int96: None,
944+
}
945+
};
946+
947+
let make_meta = || FileMeta {
948+
object_meta: ObjectMeta {
949+
location: Path::from("part=1/file.parquet"),
950+
last_modified: Utc::now(),
951+
size: u64::try_from(data_size).unwrap(),
952+
e_tag: None,
953+
version: None,
954+
},
955+
range: None,
956+
extensions: None,
957+
metadata_size_hint: None,
958+
};
959+
960+
// Filter should match the partition value and data value
961+
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
962+
let predicate = logical2physical(&expr, &table_schema);
963+
let opener = make_opener(predicate);
964+
let stream = opener
965+
.open(make_meta(), file.clone())
966+
.unwrap()
967+
.await
968+
.unwrap();
969+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
970+
assert_eq!(num_batches, 1);
971+
assert_eq!(num_rows, 3);
972+
973+
// Filter should match the partition value but not the data value
974+
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
975+
let predicate = logical2physical(&expr, &table_schema);
976+
let opener = make_opener(predicate);
977+
let stream = opener
978+
.open(make_meta(), file.clone())
979+
.unwrap()
980+
.await
981+
.unwrap();
982+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
983+
assert_eq!(num_batches, 1);
984+
assert_eq!(num_rows, 3);
985+
986+
// Filter should not match the partition value but match the data value
987+
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
988+
let predicate = logical2physical(&expr, &table_schema);
989+
let opener = make_opener(predicate);
990+
let stream = opener
991+
.open(make_meta(), file.clone())
992+
.unwrap()
993+
.await
994+
.unwrap();
995+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
996+
assert_eq!(num_batches, 1);
997+
assert_eq!(num_rows, 1);
998+
999+
// Filter should not match the partition value or the data value
1000+
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1001+
let predicate = logical2physical(&expr, &table_schema);
1002+
let opener = make_opener(predicate);
1003+
let stream = opener.open(make_meta(), file).unwrap().await.unwrap();
1004+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1005+
assert_eq!(num_batches, 0);
1006+
assert_eq!(num_rows, 0);
1007+
}
8821008
}

0 commit comments

Comments
 (0)