Skip to content

Commit 8d9495a

Browse files
yucaicloud-fan
authored andcommitted
[SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet
## What changes were proposed in this pull request? Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false. Like the below case: ```scala spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024) spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t") sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'") sql("select * from t where id < 100L").write.csv("/tmp/id") ``` Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading. This PR provides a case-insensitive field resolution to make it work. Before - "ID < 100L" fail to pushedown: <img width="273" alt="screen shot 2018-08-23 at 10 08 26 pm" src="https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png"> After - "ID < 100L" pushedown sucessfully: <img width="267" alt="screen shot 2018-08-23 at 10 08 40 pm" src="https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png"> ## How was this patch tested? Added UTs. Closes apache#22197 from yucai/SPARK-25207. Authored-by: yucai <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 515708d commit 8d9495a

File tree

3 files changed

+179
-29
lines changed

3 files changed

+179
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ class ParquetFileFormat
347347
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
348348
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
349349
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
350+
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
350351

351352
(file: PartitionedFile) => {
352353
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -372,7 +373,7 @@ class ParquetFileFormat
372373
val pushed = if (enableParquetFilterPushDown) {
373374
val parquetSchema = footerFileMetaData.getSchema
374375
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
375-
pushDownStringStartWith, pushDownInFilterThreshold)
376+
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
376377
filters
377378
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
378379
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
2020
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
2121
import java.math.{BigDecimal => JBigDecimal}
2222
import java.sql.{Date, Timestamp}
23+
import java.util.Locale
2324

2425
import scala.collection.JavaConverters.asScalaBufferConverter
2526

@@ -31,7 +32,7 @@ import org.apache.parquet.schema.OriginalType._
3132
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3233
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
3334

34-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
35+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3536
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
3637
import org.apache.spark.sql.sources
3738
import org.apache.spark.unsafe.types.UTF8String
@@ -44,7 +45,18 @@ private[parquet] class ParquetFilters(
4445
pushDownTimestamp: Boolean,
4546
pushDownDecimal: Boolean,
4647
pushDownStartWith: Boolean,
47-
pushDownInFilterThreshold: Int) {
48+
pushDownInFilterThreshold: Int,
49+
caseSensitive: Boolean) {
50+
51+
/**
52+
* Holds a single field information stored in the underlying parquet file.
53+
*
54+
* @param fieldName field name in parquet file
55+
* @param fieldType field type related info in parquet file
56+
*/
57+
private case class ParquetField(
58+
fieldName: String,
59+
fieldType: ParquetSchemaType)
4860

4961
private case class ParquetSchemaType(
5062
originalType: OriginalType,
@@ -350,25 +362,38 @@ private[parquet] class ParquetFilters(
350362
}
351363

352364
/**
353-
* Returns a map from name of the column to the data type, if predicate push down applies.
365+
* Returns a map, which contains parquet field name and data type, if predicate push down applies.
354366
*/
355-
private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match {
356-
case m: MessageType =>
357-
// Here we don't flatten the fields in the nested schema but just look up through
358-
// root fields. Currently, accessing to nested fields does not push down filters
359-
// and it does not support to create filters for them.
360-
m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
361-
f.getName -> ParquetSchemaType(
362-
f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)
363-
}.toMap
364-
case _ => Map.empty[String, ParquetSchemaType]
367+
private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = {
368+
// Here we don't flatten the fields in the nested schema but just look up through
369+
// root fields. Currently, accessing to nested fields does not push down filters
370+
// and it does not support to create filters for them.
371+
val primitiveFields =
372+
dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
373+
f.getName -> ParquetField(f.getName,
374+
ParquetSchemaType(f.getOriginalType,
375+
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
376+
}
377+
if (caseSensitive) {
378+
primitiveFields.toMap
379+
} else {
380+
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
381+
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
382+
// See: SPARK-25132.
383+
val dedupPrimitiveFields =
384+
primitiveFields
385+
.groupBy(_._1.toLowerCase(Locale.ROOT))
386+
.filter(_._2.size == 1)
387+
.mapValues(_.head._2)
388+
CaseInsensitiveMap(dedupPrimitiveFields)
389+
}
365390
}
366391

367392
/**
368393
* Converts data sources filters to Parquet filter predicates.
369394
*/
370395
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
371-
val nameToType = getFieldMap(schema)
396+
val nameToParquetField = getFieldMap(schema)
372397

373398
// Decimal type must make sure that filter value's scale matched the file.
374399
// If doesn't matched, which would cause data corruption.
@@ -381,7 +406,7 @@ private[parquet] class ParquetFilters(
381406
// Parquet's type in the given file should be matched to the value's type
382407
// in the pushed filter in order to push down the filter to Parquet.
383408
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
384-
value == null || (nameToType(name) match {
409+
value == null || (nameToParquetField(name).fieldType match {
385410
case ParquetBooleanType => value.isInstanceOf[JBoolean]
386411
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
387412
case ParquetLongType => value.isInstanceOf[JLong]
@@ -408,7 +433,7 @@ private[parquet] class ParquetFilters(
408433
// filters for the column having dots in the names. Thus, we do not push down such filters.
409434
// See SPARK-20364.
410435
def canMakeFilterOn(name: String, value: Any): Boolean = {
411-
nameToType.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
436+
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
412437
}
413438

414439
// NOTE:
@@ -428,29 +453,39 @@ private[parquet] class ParquetFilters(
428453

429454
predicate match {
430455
case sources.IsNull(name) if canMakeFilterOn(name, null) =>
431-
makeEq.lift(nameToType(name)).map(_(name, null))
456+
makeEq.lift(nameToParquetField(name).fieldType)
457+
.map(_(nameToParquetField(name).fieldName, null))
432458
case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
433-
makeNotEq.lift(nameToType(name)).map(_(name, null))
459+
makeNotEq.lift(nameToParquetField(name).fieldType)
460+
.map(_(nameToParquetField(name).fieldName, null))
434461

435462
case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
436-
makeEq.lift(nameToType(name)).map(_(name, value))
463+
makeEq.lift(nameToParquetField(name).fieldType)
464+
.map(_(nameToParquetField(name).fieldName, value))
437465
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
438-
makeNotEq.lift(nameToType(name)).map(_(name, value))
466+
makeNotEq.lift(nameToParquetField(name).fieldType)
467+
.map(_(nameToParquetField(name).fieldName, value))
439468

440469
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
441-
makeEq.lift(nameToType(name)).map(_(name, value))
470+
makeEq.lift(nameToParquetField(name).fieldType)
471+
.map(_(nameToParquetField(name).fieldName, value))
442472
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
443-
makeNotEq.lift(nameToType(name)).map(_(name, value))
473+
makeNotEq.lift(nameToParquetField(name).fieldType)
474+
.map(_(nameToParquetField(name).fieldName, value))
444475

445476
case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
446-
makeLt.lift(nameToType(name)).map(_(name, value))
477+
makeLt.lift(nameToParquetField(name).fieldType)
478+
.map(_(nameToParquetField(name).fieldName, value))
447479
case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
448-
makeLtEq.lift(nameToType(name)).map(_(name, value))
480+
makeLtEq.lift(nameToParquetField(name).fieldType)
481+
.map(_(nameToParquetField(name).fieldName, value))
449482

450483
case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
451-
makeGt.lift(nameToType(name)).map(_(name, value))
484+
makeGt.lift(nameToParquetField(name).fieldType)
485+
.map(_(nameToParquetField(name).fieldName, value))
452486
case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
453-
makeGtEq.lift(nameToType(name)).map(_(name, value))
487+
makeGtEq.lift(nameToParquetField(name).fieldType)
488+
.map(_(nameToParquetField(name).fieldName, value))
454489

455490
case sources.And(lhs, rhs) =>
456491
// At here, it is not safe to just convert one side if we do not understand the
@@ -477,7 +512,8 @@ private[parquet] class ParquetFilters(
477512
case sources.In(name, values) if canMakeFilterOn(name, values.head)
478513
&& values.distinct.length <= pushDownInFilterThreshold =>
479514
values.distinct.flatMap { v =>
480-
makeEq.lift(nameToType(name)).map(_(name, v))
515+
makeEq.lift(nameToParquetField(name).fieldType)
516+
.map(_(nameToParquetField(name).fieldName, v))
481517
}.reduceLeftOption(FilterApi.or)
482518

483519
case sources.StringStartsWith(name, prefix)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operato
2525
import org.apache.parquet.filter2.predicate.FilterApi._
2626
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
2727

28+
import org.apache.spark.SparkException
2829
import org.apache.spark.sql._
2930
import org.apache.spark.sql.catalyst.dsl.expressions._
3031
import org.apache.spark.sql.catalyst.expressions._
@@ -60,7 +61,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
6061
private lazy val parquetFilters =
6162
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
6263
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
63-
conf.parquetFilterPushDownInFilterThreshold)
64+
conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis)
6465

6566
override def beforeEach(): Unit = {
6667
super.beforeEach()
@@ -1021,6 +1022,118 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
10211022
}
10221023
}
10231024
}
1025+
1026+
test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") {
1027+
def createParquetFilter(caseSensitive: Boolean): ParquetFilters = {
1028+
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
1029+
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
1030+
conf.parquetFilterPushDownInFilterThreshold, caseSensitive)
1031+
}
1032+
val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true)
1033+
val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false)
1034+
1035+
def testCaseInsensitiveResolution(
1036+
schema: StructType,
1037+
expected: FilterPredicate,
1038+
filter: sources.Filter): Unit = {
1039+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
1040+
1041+
assertResult(Some(expected)) {
1042+
caseInsensitiveParquetFilters.createFilter(parquetSchema, filter)
1043+
}
1044+
assertResult(None) {
1045+
caseSensitiveParquetFilters.createFilter(parquetSchema, filter)
1046+
}
1047+
}
1048+
1049+
val schema = StructType(Seq(StructField("cint", IntegerType)))
1050+
1051+
testCaseInsensitiveResolution(
1052+
schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT"))
1053+
1054+
testCaseInsensitiveResolution(
1055+
schema,
1056+
FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]),
1057+
sources.IsNotNull("CINT"))
1058+
1059+
testCaseInsensitiveResolution(
1060+
schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000))
1061+
1062+
testCaseInsensitiveResolution(
1063+
schema,
1064+
FilterApi.notEq(intColumn("cint"), 1000: Integer),
1065+
sources.Not(sources.EqualTo("CINT", 1000)))
1066+
1067+
testCaseInsensitiveResolution(
1068+
schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT", 1000))
1069+
1070+
testCaseInsensitiveResolution(
1071+
schema,
1072+
FilterApi.notEq(intColumn("cint"), 1000: Integer),
1073+
sources.Not(sources.EqualNullSafe("CINT", 1000)))
1074+
1075+
testCaseInsensitiveResolution(
1076+
schema,
1077+
FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000))
1078+
1079+
testCaseInsensitiveResolution(
1080+
schema,
1081+
FilterApi.ltEq(intColumn("cint"), 1000: Integer),
1082+
sources.LessThanOrEqual("CINT", 1000))
1083+
1084+
testCaseInsensitiveResolution(
1085+
schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT", 1000))
1086+
1087+
testCaseInsensitiveResolution(
1088+
schema,
1089+
FilterApi.gtEq(intColumn("cint"), 1000: Integer),
1090+
sources.GreaterThanOrEqual("CINT", 1000))
1091+
1092+
testCaseInsensitiveResolution(
1093+
schema,
1094+
FilterApi.or(
1095+
FilterApi.eq(intColumn("cint"), 10: Integer),
1096+
FilterApi.eq(intColumn("cint"), 20: Integer)),
1097+
sources.In("CINT", Array(10, 20)))
1098+
1099+
val dupFieldSchema = StructType(
1100+
Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType)))
1101+
val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
1102+
assertResult(None) {
1103+
caseInsensitiveParquetFilters.createFilter(
1104+
dupParquetSchema, sources.EqualTo("CINT", 1000))
1105+
}
1106+
}
1107+
1108+
test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
1109+
withTempPath { dir =>
1110+
val count = 10
1111+
val tableName = "spark_25207"
1112+
val tableDir = dir.getAbsoluteFile + "/table"
1113+
withTable(tableName) {
1114+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
1115+
spark.range(count).selectExpr("id as A", "id as B", "id as b")
1116+
.write.mode("overwrite").parquet(tableDir)
1117+
}
1118+
sql(
1119+
s"""
1120+
|CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir'
1121+
""".stripMargin)
1122+
1123+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
1124+
val e = intercept[SparkException] {
1125+
sql(s"select a from $tableName where b > 0").collect()
1126+
}
1127+
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
1128+
"""Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
1129+
}
1130+
1131+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
1132+
checkAnswer(sql(s"select A from $tableName where B > 0"), (1 until count).map(Row(_)))
1133+
}
1134+
}
1135+
}
1136+
}
10241137
}
10251138

10261139
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {

0 commit comments

Comments
 (0)