Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit af12a21

Browse files
HyukjinKwonliancheng
authored andcommitted
[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
## What changes were proposed in this pull request? Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side. For example, the codes below: ```scala val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF() df.filter($"_1" === "true").explain(true) ``` shows it keeps `null` properly. ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- LocalRelation [_1#17] == Analyzed Logical Plan == _1: boolean Filter (cast(_1#17 as double) = cast(true as double)) +- LocalRelation [_1#17] == Optimized Logical Plan == Filter (isnotnull(_1#17) && null) +- LocalRelation [_1#17] == Physical Plan == *Filter (isnotnull(_1#17) && null) << Here `null` is there +- LocalTableScan [_1#17] ``` However, when we read it back from Parquet, ```scala val path = "/tmp/testfile" df.write.parquet(path) spark.read.parquet(path).filter($"_1" === "true").explain(true) ``` `null` is removed at the post-filter. ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- Relation[_1#11] parquet == Analyzed Logical Plan == _1: boolean Filter (cast(_1#11 as double) = cast(true as double)) +- Relation[_1#11] parquet == Optimized Logical Plan == Filter (isnotnull(_1#11) && null) +- Relation[_1#11] parquet == Physical Plan == *Project [_1#11] +- *Filter isnotnull(_1#11) << Here `null` is missing +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean> ``` This PR fixes it to keep it properly. In more details, ```scala val partitionKeyFilters = ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) ``` This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty which is always the subset of `partitionSet`. And then in ```scala val afterScanFilters = filterSet -- partitionKeyFilters ``` `null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too. After this PR, it becomes as below: ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- Relation[_1#276] parquet == Analyzed Logical Plan == _1: boolean Filter (cast(_1#276 as double) = cast(true as double)) +- Relation[_1#276] parquet == Optimized Logical Plan == Filter (isnotnull(_1#276) && null) +- Relation[_1#276] parquet == Physical Plan == *Project [_1#276] +- *Filter (isnotnull(_1#276) && null) +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean> ``` ## How was this patch tested? Unit test in `FileSourceStrategySuite` Author: hyukjinkwon <[email protected]> Closes apache#16184 from HyukjinKwon/SPARK-18753. (cherry picked from commit 89ae26d) Signed-off-by: Cheng Lian <[email protected]>
1 parent 16d4bd4 commit af12a21

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ object FileSourceStrategy extends Strategy with Logging {
8686
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
8787

8888
// Predicates with both partition keys and attributes need to be evaluated after the scan.
89-
val afterScanFilters = filterSet -- partitionKeyFilters
89+
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
9090
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
9191

9292
val filterAttributes = AttributeSet(afterScanFilters)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
476476
}
477477
}
478478

479+
test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side post-filter") {
480+
val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS()
481+
withTempPath { p =>
482+
val path = p.getAbsolutePath
483+
ds.write.parquet(path)
484+
val readBack = spark.read.parquet(path).filter($"_1" === "true")
485+
val filtered = ds.filter($"_1" === "true").toDF()
486+
checkAnswer(readBack, filtered)
487+
}
488+
}
489+
479490
// Helpers for checking the arguments passed to the FileFormat.
480491

481492
protected val checkPartitionSchema =

0 commit comments

Comments
 (0)