Skip to content

Commit b02e76c

Browse files
yucaicloud-fan
authored andcommitted
[SPARK-23727][SQL] Support for pushing down filters for DateType in parquet
## What changes were proposed in this pull request? This PR supports for pushing down filters for DateType in parquet ## How was this patch tested? Added UT and tested in local. Author: yucai <[email protected]> Closes apache#20851 from yucai/SPARK-23727.
1 parent df05fb6 commit b02e76c

File tree

3 files changed

+89
-3
lines changed

3 files changed

+89
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,13 @@ object SQLConf {
353353
.booleanConf
354354
.createWithDefault(true)
355355

356+
val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
357+
.doc("If true, enables Parquet filter push-down optimization for Date. " +
358+
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
359+
.internal()
360+
.booleanConf
361+
.createWithDefault(true)
362+
356363
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
357364
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
358365
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1329,6 +1336,8 @@ class SQLConf extends Serializable with Logging {
13291336

13301337
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
13311338

1339+
def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
1340+
13321341
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
13331342

13341343
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.sql.Date
21+
2022
import org.apache.parquet.filter2.predicate._
2123
import org.apache.parquet.filter2.predicate.FilterApi._
2224
import org.apache.parquet.io.api.Binary
2325

26+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
27+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
28+
import org.apache.spark.sql.internal.SQLConf
2429
import org.apache.spark.sql.sources
2530
import org.apache.spark.sql.types._
2631

@@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
2934
*/
3035
private[parquet] object ParquetFilters {
3136

37+
private def dateToDays(date: Date): SQLDate = {
38+
DateTimeUtils.fromJavaDate(date)
39+
}
40+
3241
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
3342
case BooleanType =>
3443
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
@@ -50,6 +59,10 @@ private[parquet] object ParquetFilters {
5059
(n: String, v: Any) => FilterApi.eq(
5160
binaryColumn(n),
5261
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
62+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
63+
(n: String, v: Any) => FilterApi.eq(
64+
intColumn(n),
65+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
5366
}
5467

5568
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -72,6 +85,10 @@ private[parquet] object ParquetFilters {
7285
(n: String, v: Any) => FilterApi.notEq(
7386
binaryColumn(n),
7487
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
88+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
89+
(n: String, v: Any) => FilterApi.notEq(
90+
intColumn(n),
91+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
7592
}
7693

7794
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -91,6 +108,10 @@ private[parquet] object ParquetFilters {
91108
case BinaryType =>
92109
(n: String, v: Any) =>
93110
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
111+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
112+
(n: String, v: Any) => FilterApi.lt(
113+
intColumn(n),
114+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
94115
}
95116

96117
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -110,6 +131,10 @@ private[parquet] object ParquetFilters {
110131
case BinaryType =>
111132
(n: String, v: Any) =>
112133
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
134+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
135+
(n: String, v: Any) => FilterApi.ltEq(
136+
intColumn(n),
137+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
113138
}
114139

115140
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
129154
case BinaryType =>
130155
(n: String, v: Any) =>
131156
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
157+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
158+
(n: String, v: Any) => FilterApi.gt(
159+
intColumn(n),
160+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
132161
}
133162

134163
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -148,6 +177,10 @@ private[parquet] object ParquetFilters {
148177
case BinaryType =>
149178
(n: String, v: Any) =>
150179
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
180+
case DateType if SQLConf.get.parquetFilterPushDownDate =>
181+
(n: String, v: Any) => FilterApi.gtEq(
182+
intColumn(n),
183+
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
151184
}
152185

153186
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.nio.charset.StandardCharsets
21+
import java.sql.Date
2122

2223
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
2324
import org.apache.parquet.filter2.predicate.FilterApi._
@@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
7677
expected: Seq[Row]): Unit = {
7778
val output = predicate.collect { case a: Attribute => a }.distinct
7879

79-
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
80-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
80+
withSQLConf(
81+
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
82+
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
83+
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
8184
val query = df
8285
.select(output.map(e => Column(e)): _*)
8386
.where(Column(predicate))
@@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
102105
maybeFilter.exists(_.getClass === filterClass)
103106
}
104107
checker(stripSparkFilter(query), expected)
105-
}
106108
}
107109
}
108110

@@ -313,6 +315,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
313315
}
314316
}
315317

318+
test("filter pushdown - date") {
319+
implicit class StringToDate(s: String) {
320+
def date: Date = Date.valueOf(s)
321+
}
322+
323+
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
324+
325+
withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
326+
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
327+
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))
328+
329+
checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
330+
checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
331+
checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
332+
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))
333+
334+
checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
335+
checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
336+
checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
337+
checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)
338+
339+
checkFilterPredicate(
340+
Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
341+
checkFilterPredicate(
342+
Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
343+
checkFilterPredicate(
344+
Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
345+
checkFilterPredicate(
346+
Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
347+
checkFilterPredicate(
348+
Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
349+
checkFilterPredicate(
350+
Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)
351+
352+
checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
353+
checkFilterPredicate(
354+
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
355+
classOf[Operators.Or],
356+
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
357+
}
358+
}
359+
316360
test("SPARK-6554: don't push down predicates which reference partition columns") {
317361
import testImplicits._
318362

0 commit comments

Comments
 (0)