Skip to content

Commit 2d3172d

Browse files
authored
chore: Enable plan stability suite for native_datafusion scans (#2877)
1 parent 97767c2 commit 2d3172d

File tree

1,219 files changed

+194445
-94215
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,219 files changed

+194445
-94215
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
160160
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
161161
}
162162

163+
// Native DataFusion doesn't support subqueries/dynamic pruning
164+
if (scanImpl == SCAN_NATIVE_DATAFUSION &&
165+
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
166+
fallbackReasons += "Native DataFusion scan does not support subqueries/dynamic pruning"
167+
return withInfos(scanExec, fallbackReasons.toSet)
168+
}
169+
163170
if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
164171
fallbackReasons +=
165172
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
6565
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(scan.conf)) {
6666

6767
val dataFilters = new ListBuffer[Expr]()
68-
for (filter <- scan.dataFilters) {
68+
for (filter <- scan.supportedDataFilters) {
6969
exprToProto(filter, scan.output) match {
7070
case Some(proto) => dataFilters += proto
7171
case _ =>

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,21 @@ case class CometScanExec(
155155
}
156156
}
157157

158-
@transient
159-
private lazy val pushedDownFilters = {
160-
val supportedFilters = if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
161-
// `native_datafusion` scan does not support subquery pushdown filters,
162-
// see: https://github.com/apache/datafusion-comet/issues/2424
158+
/**
159+
* Returns the data filters that are supported for this scan implementation. For
160+
* native_datafusion scans, this excludes dynamic pruning filters (subqueries)
161+
*/
162+
lazy val supportedDataFilters: Seq[Expression] = {
163+
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
163164
dataFilters.filterNot(isDynamicPruningFilter)
164165
} else {
165166
dataFilters
166167
}
167-
getPushedDownFilters(relation, supportedFilters)
168+
}
169+
170+
@transient
171+
private lazy val pushedDownFilters = {
172+
getPushedDownFilters(relation, supportedDataFilters)
168173
}
169174

170175
override lazy val metadata: Map[String, String] =

0 commit comments

Comments
 (0)