Skip to content

Commit 5d726b8

Browse files
dbtsaidongjoon-hyun
andcommitted
[SPARK-25559][SQL] Remove the unsupported predicates in Parquet when possible
## What changes were proposed in this pull request? Currently, in `ParquetFilters`, if one of the children predicates is not supported by Parquet, the entire predicates will be thrown away. In fact, if the unsupported predicate is in the top level `And` condition or in the child before hitting `Not` or `Or` condition, it can be safely removed. ## How was this patch tested? Tests are added. Closes apache#22574 from dbtsai/removeUnsupportedPredicatesInParquet. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Co-authored-by: DB Tsai <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 9362c5c commit 5d726b8

File tree

2 files changed

+172
-13
lines changed

2 files changed

+172
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,13 @@ private[parquet] class ParquetFilters(
394394
*/
395395
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
396396
val nameToParquetField = getFieldMap(schema)
397+
createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
398+
}
397399

400+
private def createFilterHelper(
401+
nameToParquetField: Map[String, ParquetField],
402+
predicate: sources.Filter,
403+
canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
398404
// Decimal type must make sure that filter value's scale matched the file.
399405
// If doesn't matched, which would cause data corruption.
400406
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
@@ -488,26 +494,36 @@ private[parquet] class ParquetFilters(
488494
.map(_(nameToParquetField(name).fieldName, value))
489495

490496
case sources.And(lhs, rhs) =>
491-
// At here, it is not safe to just convert one side if we do not understand the
492-
// other side. Here is an example used to explain the reason.
497+
// At here, it is not safe to just convert one side and remove the other side
498+
// if we do not understand what the parent filters are.
499+
//
500+
// Here is an example used to explain the reason.
493501
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
494502
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
495503
// NOT(a = 2), which will generate wrong results.
496-
// Pushing one side of AND down is only safe to do at the top level.
497-
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
498-
for {
499-
lhsFilter <- createFilter(schema, lhs)
500-
rhsFilter <- createFilter(schema, rhs)
501-
} yield FilterApi.and(lhsFilter, rhsFilter)
504+
//
505+
// Pushing one side of AND down is only safe to do at the top level or in the child
506+
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
507+
// can be safely removed.
508+
val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
509+
val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
510+
511+
(lhsFilterOption, rhsFilterOption) match {
512+
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
513+
case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
514+
case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
515+
case _ => None
516+
}
502517

503518
case sources.Or(lhs, rhs) =>
504519
for {
505-
lhsFilter <- createFilter(schema, lhs)
506-
rhsFilter <- createFilter(schema, rhs)
520+
lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
521+
rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
507522
} yield FilterApi.or(lhsFilter, rhsFilter)
508523

509524
case sources.Not(pred) =>
510-
createFilter(schema, pred).map(FilterApi.not)
525+
createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
526+
.map(FilterApi.not)
511527

512528
case sources.In(name, values) if canMakeFilterOn(name, values.head)
513529
&& values.distinct.length <= pushDownInFilterThreshold =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
750750
}
751751
}
752752

753-
test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
753+
test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") {
754754
val schema = StructType(Seq(
755755
StructField("a", IntegerType, nullable = false),
756756
StructField("b", StringType, nullable = true),
@@ -770,14 +770,95 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
770770
sources.GreaterThan("c", 1.5D)))
771771
}
772772

773-
assertResult(None) {
773+
// Testing when `canRemoveOneSideInAnd == true`
774+
// case sources.And(lhs, rhs) =>
775+
// ...
776+
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
777+
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
774778
parquetFilters.createFilter(
775779
parquetSchema,
776780
sources.And(
777781
sources.LessThan("a", 10),
778782
sources.StringContains("b", "prefix")))
779783
}
780784

785+
// Testing when `canRemoveOneSideInAnd == true`
786+
// case sources.And(lhs, rhs) =>
787+
// ...
788+
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
789+
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
790+
parquetFilters.createFilter(
791+
parquetSchema,
792+
sources.And(
793+
sources.StringContains("b", "prefix"),
794+
sources.LessThan("a", 10)))
795+
}
796+
797+
// Testing complex And conditions
798+
assertResult(Some(
799+
FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) {
800+
parquetFilters.createFilter(
801+
parquetSchema,
802+
sources.And(
803+
sources.And(
804+
sources.LessThan("a", 10),
805+
sources.StringContains("b", "prefix")
806+
),
807+
sources.GreaterThan("a", 5)))
808+
}
809+
810+
// Testing complex And conditions
811+
assertResult(Some(
812+
FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) {
813+
parquetFilters.createFilter(
814+
parquetSchema,
815+
sources.And(
816+
sources.GreaterThan("a", 5),
817+
sources.And(
818+
sources.StringContains("b", "prefix"),
819+
sources.LessThan("a", 10)
820+
)))
821+
}
822+
823+
// Testing
824+
// case sources.Or(lhs, rhs) =>
825+
// ...
826+
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
827+
assertResult(None) {
828+
parquetFilters.createFilter(
829+
parquetSchema,
830+
sources.Or(
831+
sources.And(
832+
sources.GreaterThan("a", 1),
833+
sources.StringContains("b", "prefix")),
834+
sources.GreaterThan("a", 2)))
835+
}
836+
837+
// Testing
838+
// case sources.Or(lhs, rhs) =>
839+
// ...
840+
// rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
841+
assertResult(None) {
842+
parquetFilters.createFilter(
843+
parquetSchema,
844+
sources.Or(
845+
sources.GreaterThan("a", 2),
846+
sources.And(
847+
sources.GreaterThan("a", 1),
848+
sources.StringContains("b", "prefix"))))
849+
}
850+
851+
// Testing
852+
// case sources.Not(pred) =>
853+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
854+
// .map(FilterApi.not)
855+
//
856+
// and
857+
//
858+
// Testing when `canRemoveOneSideInAnd == false`
859+
// case sources.And(lhs, rhs) =>
860+
// ...
861+
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
781862
assertResult(None) {
782863
parquetFilters.createFilter(
783864
parquetSchema,
@@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
786867
sources.GreaterThan("a", 1),
787868
sources.StringContains("b", "prefix"))))
788869
}
870+
871+
// Testing
872+
// case sources.Not(pred) =>
873+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
874+
// .map(FilterApi.not)
875+
//
876+
// and
877+
//
878+
// Testing when `canRemoveOneSideInAnd == false`
879+
// case sources.And(lhs, rhs) =>
880+
// ...
881+
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
882+
assertResult(None) {
883+
parquetFilters.createFilter(
884+
parquetSchema,
885+
sources.Not(
886+
sources.And(
887+
sources.StringContains("b", "prefix"),
888+
sources.GreaterThan("a", 1))))
889+
}
890+
891+
// Testing
892+
// case sources.Not(pred) =>
893+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
894+
// .map(FilterApi.not)
895+
//
896+
// and
897+
//
898+
// Testing passing `canRemoveOneSideInAnd = false` into
899+
// case sources.And(lhs, rhs) =>
900+
// val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
901+
assertResult(None) {
902+
parquetFilters.createFilter(
903+
parquetSchema,
904+
sources.Not(
905+
sources.And(
906+
sources.And(
907+
sources.GreaterThan("a", 1),
908+
sources.StringContains("b", "prefix")),
909+
sources.GreaterThan("a", 2))))
910+
}
911+
912+
// Testing
913+
// case sources.Not(pred) =>
914+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
915+
// .map(FilterApi.not)
916+
//
917+
// and
918+
//
919+
// Testing passing `canRemoveOneSideInAnd = false` into
920+
// case sources.And(lhs, rhs) =>
921+
// val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
922+
assertResult(None) {
923+
parquetFilters.createFilter(
924+
parquetSchema,
925+
sources.Not(
926+
sources.And(
927+
sources.GreaterThan("a", 2),
928+
sources.And(
929+
sources.GreaterThan("a", 1),
930+
sources.StringContains("b", "prefix")))))
931+
}
789932
}
790933

791934
test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {

0 commit comments

Comments
 (0)