Skip to content

Commit 103f513

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-25306][SQL] Avoid skewed filter trees to speed up createFilter in ORC
## What changes were proposed in this pull request? In both ORC data sources, `createFilter` function has exponential time complexity due to its skewed filter tree generation. This PR aims to improve it by using new `buildTree` function. **REPRODUCE** ```scala // Create and read 1 row table with 1000 columns sql("set spark.sql.orc.filterPushdown=true") val selectExpr = (1 to 1000).map(i => s"id c$i") spark.range(1).selectExpr(selectExpr: _*).write.mode("overwrite").orc("/tmp/orc") print(s"With 0 filters, ") spark.time(spark.read.orc("/tmp/orc").count) // Increase the number of filters (20 to 30).foreach { width => val whereExpr = (1 to width).map(i => s"c$i is not null").mkString(" and ") print(s"With $width filters, ") spark.time(spark.read.orc("/tmp/orc").where(whereExpr).count) } ``` **RESULT** ```scala With 0 filters, Time taken: 653 ms With 20 filters, Time taken: 962 ms With 21 filters, Time taken: 1282 ms With 22 filters, Time taken: 1982 ms With 23 filters, Time taken: 3855 ms With 24 filters, Time taken: 6719 ms With 25 filters, Time taken: 12669 ms With 26 filters, Time taken: 25032 ms With 27 filters, Time taken: 49585 ms With 28 filters, Time taken: 98980 ms // over 1 min 38 seconds With 29 filters, Time taken: 198368 ms // over 3 mins With 30 filters, Time taken: 393744 ms // over 6 mins ``` **AFTER THIS PR** ```scala With 0 filters, Time taken: 774 ms With 20 filters, Time taken: 601 ms With 21 filters, Time taken: 399 ms With 22 filters, Time taken: 679 ms With 23 filters, Time taken: 363 ms With 24 filters, Time taken: 342 ms With 25 filters, Time taken: 336 ms With 26 filters, Time taken: 352 ms With 27 filters, Time taken: 322 ms With 28 filters, Time taken: 302 ms With 29 filters, Time taken: 307 ms With 30 filters, Time taken: 301 ms ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. Closes apache#22313 from dongjoon-hyun/SPARK-25306. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 061bb01 commit 103f513

File tree

4 files changed

+82
-21
lines changed

4 files changed

+82
-21
lines changed

sql/core/benchmarks/FilterPushdownBenchmark-results.txt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,3 +702,37 @@ Parquet Vectorized (Pushdown) 11766 / 11927 1.3 7
702702
Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X
703703
Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X
704704

705+
706+
================================================================================================
707+
Pushdown benchmark with many filters
708+
================================================================================================
709+
710+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
711+
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
712+
713+
Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
714+
------------------------------------------------------------------------------------------------
715+
Parquet Vectorized 158 / 182 0.0 158442969.0 1.0X
716+
Parquet Vectorized (Pushdown) 150 / 158 0.0 149718289.0 1.1X
717+
Native ORC Vectorized 141 / 148 0.0 141259852.0 1.1X
718+
Native ORC Vectorized (Pushdown) 142 / 147 0.0 142016472.0 1.1X
719+
720+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
721+
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
722+
723+
Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
724+
------------------------------------------------------------------------------------------------
725+
Parquet Vectorized 1013 / 1026 0.0 1013194322.0 1.0X
726+
Parquet Vectorized (Pushdown) 1326 / 1332 0.0 1326301956.0 0.8X
727+
Native ORC Vectorized 1005 / 1010 0.0 1005266379.0 1.0X
728+
Native ORC Vectorized (Pushdown) 1068 / 1071 0.0 1067964993.0 0.9X
729+
730+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
731+
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
732+
733+
Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
734+
------------------------------------------------------------------------------------------------
735+
Parquet Vectorized 3598 / 3614 0.0 3598001202.0 1.0X
736+
Parquet Vectorized (Pushdown) 4282 / 4333 0.0 4281849770.0 0.8X
737+
Native ORC Vectorized 3594 / 3619 0.0 3593551548.0 1.0X
738+
Native ORC Vectorized (Pushdown) 3834 / 3840 0.0 3834240570.0 0.9X

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20-
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory}
20+
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
2121
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
22+
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
2223
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
2324

24-
import org.apache.spark.sql.sources.Filter
25+
import org.apache.spark.sql.sources.{And, Filter}
2526
import org.apache.spark.sql.types._
2627

2728
/**
@@ -54,7 +55,17 @@ import org.apache.spark.sql.types._
5455
* builder methods mentioned above can only be found in test code, where all tested filters are
5556
* known to be convertible.
5657
*/
57-
private[orc] object OrcFilters {
58+
private[sql] object OrcFilters {
59+
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
60+
filters match {
61+
case Seq() => None
62+
case Seq(filter) => Some(filter)
63+
case Seq(filter1, filter2) => Some(And(filter1, filter2))
64+
case _ => // length > 2
65+
val (left, right) = filters.splitAt(filters.length / 2)
66+
Some(And(buildTree(left).get, buildTree(right).get))
67+
}
68+
}
5869

5970
/**
6071
* Create ORC filter as a SearchArgument instance.
@@ -66,14 +77,14 @@ private[orc] object OrcFilters {
6677
// collect all convertible ones to build the final `SearchArgument`.
6778
val convertibleFilters = for {
6879
filter <- filters
69-
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
80+
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
7081
} yield filter
7182

7283
for {
7384
// Combines all convertible filters using `And` to produce a single conjunction
74-
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
85+
conjunction <- buildTree(convertibleFilters)
7586
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
76-
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
87+
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
7788
} yield builder.build()
7889
}
7990

@@ -127,8 +138,6 @@ private[orc] object OrcFilters {
127138
dataTypeMap: Map[String, DataType],
128139
expression: Filter,
129140
builder: Builder): Option[Builder] = {
130-
def newBuilder = SearchArgumentFactory.newBuilder()
131-
132141
def getType(attribute: String): PredicateLeaf.Type =
133142
getPredicateLeafType(dataTypeMap(attribute))
134143

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
242242

243243
ignore("Pushdown for many distinct value case") {
244244
withTempPath { dir =>
245-
withTempTable("orcTable", "patquetTable") {
245+
withTempTable("orcTable", "parquetTable") {
246246
Seq(true, false).foreach { useStringForValue =>
247247
prepareTable(dir, numRows, width, useStringForValue)
248248
if (useStringForValue) {
@@ -259,7 +259,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
259259
withTempPath { dir =>
260260
val numDistinctValues = 200
261261

262-
withTempTable("orcTable", "patquetTable") {
262+
withTempTable("orcTable", "parquetTable") {
263263
prepareStringDictTable(dir, numRows, numDistinctValues, width)
264264
runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
265265
}
@@ -268,7 +268,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
268268

269269
ignore("Pushdown benchmark for StringStartsWith") {
270270
withTempPath { dir =>
271-
withTempTable("orcTable", "patquetTable") {
271+
withTempTable("orcTable", "parquetTable") {
272272
prepareTable(dir, numRows, width, true)
273273
Seq(
274274
"value like '10%'",
@@ -296,7 +296,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
296296
monotonically_increasing_id()
297297
}
298298
val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
299-
withTempTable("orcTable", "patquetTable") {
299+
withTempTable("orcTable", "parquetTable") {
300300
saveAsTable(df, dir)
301301

302302
Seq(s"value = $mid").foreach { whereExpr =>
@@ -320,7 +320,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
320320

321321
ignore("Pushdown benchmark for InSet -> InFilters") {
322322
withTempPath { dir =>
323-
withTempTable("orcTable", "patquetTable") {
323+
withTempTable("orcTable", "parquetTable") {
324324
prepareTable(dir, numRows, width, false)
325325
Seq(5, 10, 50, 100).foreach { count =>
326326
Seq(10, 50, 90).foreach { distribution =>
@@ -341,7 +341,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
341341
val df = spark.range(numRows).selectExpr(columns: _*)
342342
.withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
343343
.orderBy("value")
344-
withTempTable("orcTable", "patquetTable") {
344+
withTempTable("orcTable", "parquetTable") {
345345
saveAsTable(df, dir)
346346

347347
Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
@@ -373,7 +373,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
373373
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
374374
val df = spark.range(numRows).selectExpr(columns: _*)
375375
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
376-
withTempTable("orcTable", "patquetTable") {
376+
withTempTable("orcTable", "parquetTable") {
377377
saveAsTable(df, dir)
378378

379379
Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
@@ -398,6 +398,24 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
398398
}
399399
}
400400
}
401+
402+
test(s"Pushdown benchmark with many filters") {
403+
val numRows = 1
404+
val width = 500
405+
406+
withTempPath { dir =>
407+
val columns = (1 to width).map(i => s"id c$i")
408+
val df = spark.range(1).selectExpr(columns: _*)
409+
withTempTable("orcTable", "parquetTable") {
410+
saveAsTable(df, dir)
411+
Seq(1, 250, 500).foreach { numFilter =>
412+
val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
413+
// Note: InferFiltersFromConstraints will add more filters to this given filters
414+
filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
415+
}
416+
}
417+
}
418+
}
401419
}
402420

403421
trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.sql.hive.orc
1919

20-
import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory}
20+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
2121
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
22+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
2223

2324
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
2426
import org.apache.spark.sql.sources._
2527
import org.apache.spark.sql.types._
2628

@@ -62,23 +64,21 @@ private[orc] object OrcFilters extends Logging {
6264
// collect all convertible ones to build the final `SearchArgument`.
6365
val convertibleFilters = for {
6466
filter <- filters
65-
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
67+
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
6668
} yield filter
6769

6870
for {
6971
// Combines all convertible filters using `And` to produce a single conjunction
70-
conjunction <- convertibleFilters.reduceOption(And)
72+
conjunction <- buildTree(convertibleFilters)
7173
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
72-
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
74+
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
7375
} yield builder.build()
7476
}
7577

7678
private def buildSearchArgument(
7779
dataTypeMap: Map[String, DataType],
7880
expression: Filter,
7981
builder: Builder): Option[Builder] = {
80-
def newBuilder = SearchArgumentFactory.newBuilder()
81-
8282
def isSearchableType(dataType: DataType): Boolean = dataType match {
8383
// Only the values in the Spark types below can be recognized by
8484
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.

0 commit comments

Comments
 (0)