Skip to content

Commit a369fc1

Browse files
author
Robert Kruszewski
committed
revert filter tests
1 parent 3bd969f commit a369fc1

File tree

1 file changed

+69
-14
lines changed

1 file changed

+69
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,59 @@ private[parquet] class ParquetFilters(
9191
Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
9292
}
9393

94+
private val makeInSet:
95+
PartialFunction[ParquetSchemaType, (String, Set[Any]) => FilterPredicate] = {
96+
case ParquetBooleanType =>
97+
(n: String, v: Set[Any]) => FilterApi.userDefined(
98+
booleanColumn(n), SetInFilter(v.asInstanceOf[Set[JBoolean]]))
99+
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
100+
(n: String, v: Set[Any]) => FilterApi.userDefined(
101+
intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
102+
case ParquetLongType =>
103+
(n: String, v: Set[Any]) => FilterApi.userDefined(
104+
longColumn(n), SetInFilter(v.asInstanceOf[Set[JLong]]))
105+
case ParquetFloatType =>
106+
(n: String, v: Set[Any]) => FilterApi.userDefined(
107+
floatColumn(n), SetInFilter(v.asInstanceOf[Set[JFloat]]))
108+
case ParquetDoubleType =>
109+
(n: String, v: Set[Any]) => FilterApi.userDefined(
110+
doubleColumn(n), SetInFilter(v.asInstanceOf[Set[JDouble]]))
111+
// Binary.fromString and Binary.fromByteArray don't accept null values
112+
case ParquetStringType =>
113+
(n: String, v: Set[Any]) => FilterApi.userDefined(
114+
binaryColumn(n),
115+
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
116+
case ParquetBinaryType =>
117+
(n: String, v: Set[Any]) => FilterApi.userDefined(
118+
binaryColumn(n),
119+
SetInFilter(v.map(b => Binary.fromReusedByteArray(b.asInstanceOf[Array[Byte]]))))
120+
case ParquetDateType if pushDownDate =>
121+
(n: String, v: Set[Any]) => FilterApi.userDefined(
122+
intColumn(n),
123+
SetInFilter(v.map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer])))
124+
case ParquetTimestampMicrosType if pushDownTimestamp =>
125+
(n: String, v: Set[Any]) => FilterApi.userDefined(
126+
longColumn(n),
127+
SetInFilter(v.map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
128+
.asInstanceOf[JLong])))
129+
case ParquetTimestampMillisType if pushDownTimestamp =>
130+
(n: String, v: Set[Any]) => FilterApi.userDefined(
131+
longColumn(n),
132+
SetInFilter(v.map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])))
133+
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
134+
(n: String, v: Set[Any]) => FilterApi.userDefined(
135+
intColumn(n),
136+
SetInFilter(v.map(d => decimalToInt32(d.asInstanceOf[JBigDecimal]))))
137+
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
138+
(n: String, v: Set[Any]) => FilterApi.userDefined(
139+
longColumn(n),
140+
SetInFilter(v.map(d => decimalToInt64(d.asInstanceOf[JBigDecimal]))))
141+
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
142+
(n: String, v: Set[Any]) => FilterApi.userDefined(
143+
binaryColumn(n),
144+
SetInFilter(v.map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length))))
145+
}
146+
94147
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
95148
case ParquetBooleanType =>
96149
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[JBoolean])
@@ -376,6 +429,8 @@ private[parquet] class ParquetFilters(
376429
// Decimal type must make sure that filter value's scale matched the file.
377430
// If doesn't matched, which would cause data corruption.
378431
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
432+
case decimalSet: Set[JBigDecimal] =>
433+
decimalSet.iterator.next().scale == decimalMeta.getScale
379434
case decimal: JBigDecimal =>
380435
decimal.scale == decimalMeta.getScale
381436
case _ => false
@@ -385,16 +440,19 @@ private[parquet] class ParquetFilters(
385440
// in the pushed filter in order to push down the filter to Parquet.
386441
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
387442
value == null || (nameToType(name) match {
388-
case ParquetBooleanType => value.isInstanceOf[JBoolean]
389-
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
390-
case ParquetLongType => value.isInstanceOf[JLong]
391-
case ParquetFloatType => value.isInstanceOf[JFloat]
392-
case ParquetDoubleType => value.isInstanceOf[JDouble]
393-
case ParquetStringType => value.isInstanceOf[String]
394-
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
395-
case ParquetDateType => value.isInstanceOf[Date]
443+
case ParquetBooleanType => value.isInstanceOf[JBoolean] ||
444+
value.isInstanceOf[Set[JBoolean]]
445+
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
446+
value.isInstanceOf[Number] || value.isInstanceOf[Set[Number]]
447+
case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Set[JLong]]
448+
case ParquetFloatType => value.isInstanceOf[JFloat] || value.isInstanceOf[Set[JFloat]]
449+
case ParquetDoubleType => value.isInstanceOf[JDouble] || value.isInstanceOf[Set[JDouble]]
450+
case ParquetStringType => value.isInstanceOf[String] || value.isInstanceOf[Set[String]]
451+
case ParquetBinaryType => value.isInstanceOf[Array[Byte]] ||
452+
value.isInstanceOf[Set[Array[Byte]]]
453+
case ParquetDateType => value.isInstanceOf[Date] || value.isInstanceOf[Set[Date]]
396454
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
397-
value.isInstanceOf[Timestamp]
455+
value.isInstanceOf[Timestamp] || value.isInstanceOf[Set[Timestamp]]
398456
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
399457
isDecimalMatched(value, decimalMeta)
400458
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
@@ -470,11 +528,8 @@ private[parquet] class ParquetFilters(
470528
.map(FilterApi.not)
471529
.map(LogicalInverseRewriter.rewrite)
472530

473-
case sources.In(name, values) if canMakeFilterOn(name, values.head)
474-
&& values.distinct.length <= pushDownInFilterThreshold =>
475-
values.distinct.flatMap { v =>
476-
makeEq.lift(nameToType(name)).map(_(name, v))
477-
}.reduceLeftOption(FilterApi.or)
531+
case sources.In(name, values) if canMakeFilterOn(name, values.head) =>
532+
makeInSet.lift(nameToType(name)).map(_(name, values.toSet))
478533

479534
case sources.StringStartsWith(name, prefix)
480535
if pushDownStartWith && canMakeFilterOn(name, prefix) =>

0 commit comments

Comments
 (0)