Skip to content

Commit 469ee6e

Browse files
authored
perf: Add COMET_RESPECT_PARQUET_FILTER_PUSHDOWN config (#1936)
1 parent e4f7d07 commit 469ee6e

File tree

9 files changed

+36
-15
lines changed

9 files changed

+36
-15
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,16 @@ object CometConf extends ShimCometConf {
106106
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
107107
.toLowerCase(Locale.ROOT))
108108

109+
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
110+
conf("spark.comet.parquet.respectFilterPushdown")
111+
.doc(
112+
"Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " +
113+
"respected when running the Spark SQL test suite but the default setting " +
114+
"results in poor performance in Comet when using the new native scans, " +
115+
"disabled by default")
116+
.booleanConf
117+
.createWithDefault(false)
118+
109119
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
110120
conf("spark.comet.parquet.read.parallel.io.enabled")
111121
.doc(

dev/diffs/3.4.3.diff

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644
27982798

27992799
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
28002800
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2801-
index ed2e309fa07..71ba6533c9d 100644
2801+
index ed2e309fa07..a1fb4abe681 100644
28022802
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
28032803
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2804-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2804+
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
28052805
// this rule may potentially block testing of other optimization rules such as
28062806
// ConstantPropagation etc.
28072807
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644
28102810
+ conf
28112811
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
28122812
+ .set("spark.comet.enabled", "true")
2813+
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
28132814
+
28142815
+ if (!isCometScanOnly) {
28152816
+ conf

dev/diffs/3.5.6.diff

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644
27702770

27712771
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
27722772
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2773-
index ed2e309fa07..71ba6533c9d 100644
2773+
index ed2e309fa07..a1fb4abe681 100644
27742774
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
27752775
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2776-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2776+
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
27772777
// this rule may potentially block testing of other optimization rules such as
27782778
// ConstantPropagation etc.
27792779
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644
27822782
+ conf
27832783
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
27842784
+ .set("spark.comet.enabled", "true")
2785+
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
27852786
+
27862787
+ if (!isCometScanOnly) {
27872788
+ conf

dev/diffs/4.0.0-preview1.diff

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644
30093009

30103010
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
30113011
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3012-
index ed2e309fa07..71ba6533c9d 100644
3012+
index ed2e309fa07..a1fb4abe681 100644
30133013
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
30143014
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3015-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
3015+
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
30163016
// this rule may potentially block testing of other optimization rules such as
30173017
// ConstantPropagation etc.
30183018
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644
30213021
+ conf
30223022
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
30233023
+ .set("spark.comet.enabled", "true")
3024+
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
30243025
+
30253026
+ if (!isCometScanOnly) {
30263027
+ conf

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ Comet provides the following configuration settings.
8484
| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 |
8585
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
8686
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
87+
| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false |
8788
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
8889
| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
8990
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String)
9999
val optionsMap = CaseInsensitiveMap[String](options)
100100
val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
101101
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
102-
val parquetFilterPushDown = sqlConf.parquetFilterPushDown
102+
val parquetFilterPushDown = sqlConf.parquetFilterPushDown &&
103+
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf)
103104

104105
// Comet specific configurations
105106
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package org.apache.comet.serde
2222
import java.util.Locale
2323

2424
import scala.collection.JavaConverters._
25+
import scala.collection.mutable.ListBuffer
2526
import scala.math.min
2627

2728
import org.apache.spark.internal.Logging
@@ -2192,9 +2193,17 @@ object QueryPlanSerde extends Logging with CometExprShim {
21922193
// Sink operators don't have children
21932194
result.clearChildren()
21942195

2195-
if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) {
2196-
// TODO remove flatMap and add error handling for unsupported data filters
2197-
val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output))
2196+
if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) &&
2197+
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) {
2198+
2199+
val dataFilters = new ListBuffer[Expr]()
2200+
for (filter <- scan.dataFilters) {
2201+
exprToProto(filter, scan.output) match {
2202+
case Some(proto) => dataFilters += proto
2203+
case _ =>
2204+
logWarning(s"Unsupported data filter $filter")
2205+
}
2206+
}
21982207
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
21992208
}
22002209

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset}
2626
import scala.collection.mutable.ListBuffer
2727
import scala.reflect.ClassTag
2828
import scala.reflect.runtime.universe.TypeTag
29-
import scala.util.control.Breaks.{break, breakable}
29+
import scala.util.control.Breaks.breakable
3030

3131
import org.scalactic.source.Position
3232
import org.scalatest.Tag
@@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
19021902
withSQLConf(
19031903
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode,
19041904
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) {
1905-
if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) {
1906-
// FIXME: native_datafusion always pushdown data filters
1907-
break()
1908-
}
19091905
Seq(
19101906
("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean
19111907
("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ abstract class CometTestBase
7878
conf.set(CometConf.COMET_ENABLED.key, "true")
7979
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
8080
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
81+
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
8182
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8283
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
8384
// set the scan impl to SCAN_NATIVE_COMET because many tests are implemented

0 commit comments

Comments
 (0)