Skip to content

Commit 09d9338

Browse files
authored
fix: adjust CometNativeScan's doCanonicalize and hashCode for AQE, use DataSourceScanExec trait (#1578)
## Which issue does this PR close? Addresses another failure in #1441. ## Rationale for this change `CometExecSuite.explain native plan` fails with `native_datafusion` experimental scan. It's an interesting query that does a self-join of two columns from the same table. The root case is that when AQE is enabled, it would reuse the shuffle output from one scan as the output of the other scan: ``` +- == Initial Plan == CometProject [_1#6], [_1#6] +- CometSortMergeJoin [_1#6], [_2#11], Inner :- CometSort [_1#6], [_1#6 ASC NULLS FIRST] : +- CometExchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=304] : +- CometFilter [_1#6], isnotnull(_1#6) : +- CometNativeScan: [_1#6] +- CometSort [_2#11], [_2#11 ASC NULLS FIRST] +- CometExchange hashpartitioning(_2#11, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=308] +- CometFilter [_2#11], isnotnull(_2#11) +- CometNativeScan: [_2#11] ``` AQE incorrectly adds a `ReusedExchange` on the left side with the same `plan_id` as the right side of the join. ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) CometColumnarToRow +- CometProject [_1#6], [_1#6] +- CometBroadcastHashJoin [_1#6], [_2#11], Inner, BuildRight :- AQEShuffleRead coalesced : +- ShuffleQueryStage 0 : +- CometExchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=304] : +- CometFilter [_1#6], isnotnull(_1#6) : +- CometNativeScan: [_1#6] +- BroadcastQueryStage 2 +- CometBroadcastExchange [_2#11] +- AQEShuffleRead local +- ShuffleQueryStage 1 +- ReusedExchange [_2#11], CometExchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=304] ``` The reason is that `hashCode()` for `CometNativeScan` is only defined as the output of the node, so the `TrieMap` used in AQE (which hashes the `SparkPlan`) resulted in the stages having the same hash value (after canonicalization), making AQE think that one stage could be reused for the other. ## What changes are included in this PR? - Expand `hashCode` to include the original `FileSourceScanExec` and `serializedPlanOpt` which has better info about the node. I'd like to understand if this is hashing too much information, and may make stages that could be reused appear to distinct, but need to dig into AQE behavior more. - Expand `equals` to check more than just the plan output. - Expand `doCanonicalize` based on behavior seen in `CometScan` node. Similar to above: I'd like to understand if this is canonicalizing the right information, but need to dig into AQE behavior more. - `CometNativeScan` now uses the `DataSourceScanExec` trait. The benefit here is that we get more detailed information in the Spark plan. For example, explain before (note the `CometNativeScan`): ``` CometProject [_1#6], [_1#6] +- CometSortMergeJoin [_1#6], [_2#11], Inner :- CometSort [_1#6], [_1#6 ASC NULLS FIRST] : +- CometExchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=304] : +- CometFilter [_1#6], isnotnull(_1#6) : +- CometNativeScan: [_1#6] +- CometSort [_2#11], [_2#11 ASC NULLS FIRST] +- CometExchange hashpartitioning(_2#11, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=308] +- CometFilter [_2#11], isnotnull(_2#11) +- CometNativeScan: [_2#11] ``` and explain now (note the `CometNativeScan`): ``` CometProject [_1#6], [_1#6] +- CometSortMergeJoin [_1#6], [_2#11], Inner :- CometSort [_1#6], [_1#6 ASC NULLS FIRST] : +- CometExchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=91] : +- CometFilter [_1#6], isnotnull(_1#6) : +- CometNativeScan parquet [_1#6] Batched: true, DataFilters: [isnotnull(_1#6)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/12/4pf3d5zn72n7q2_0ks3bkh7c0000gn/T/spark-8f..., PartitionFilters: [], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:int> +- CometSort [_2#11], [_2#11 ASC NULLS FIRST] +- CometExchange hashpartitioning(_2#11, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=95] +- CometFilter [_2#11], isnotnull(_2#11) +- CometNativeScan parquet [_2#11] Batched: true, DataFilters: [isnotnull(_2#11)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/12/4pf3d5zn72n7q2_0ks3bkh7c0000gn/T/spark-8f..., PartitionFilters: [], PushedFilters: [IsNotNull(_2)], ReadSchema: struct<_2:int> ``` This better represents a corresponding Spark plan with its `FileScan` node: ``` Project [_1#6] +- SortMergeJoin [_1#6], [_2#11], Inner :- Sort [_1#6 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#6, 10), ENSURE_REQUIREMENTS, [plan_id=126] : +- Filter isnotnull(_1#6) : +- FileScan parquet [_1#6] Batched: true, DataFilters: [isnotnull(_1#6)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/12/4pf3d5zn72n7q2_0ks3bkh7c0000gn/T/spark-8f..., PartitionFilters: [], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:int> +- Sort [_2#11 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_2#11, 10), ENSURE_REQUIREMENTS, [plan_id=127] +- Filter isnotnull(_2#11) +- FileScan parquet [_2#11] Batched: true, DataFilters: [isnotnull(_2#11)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/12/4pf3d5zn72n7q2_0ks3bkh7c0000gn/T/spark-8f..., PartitionFilters: [], PushedFilters: [IsNotNull(_2)], ReadSchema: struct<_2:int> ``` - `doCanonicalize` reused a method from `CometScanExec` so I moved it to a new common `CometScanUtils`. ## How are these changes tested? Existing tests. Enabled one previously skipped test for `native_datafusion`.
1 parent 68199c2 commit 09d9338

File tree

4 files changed

+69
-20
lines changed

4 files changed

+69
-20
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ package org.apache.spark.sql.comet
2121

2222
import scala.reflect.ClassTag
2323

24+
import org.apache.spark.rdd.RDD
2425
import org.apache.spark.sql.SparkSession
2526
import org.apache.spark.sql.catalyst._
2627
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.plans.QueryPlan
2729
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
2830
import org.apache.spark.sql.execution._
2931
import org.apache.spark.sql.execution.datasources._
@@ -53,29 +55,50 @@ case class CometNativeScanExec(
5355
disableBucketedScan: Boolean = false,
5456
originalPlan: FileSourceScanExec,
5557
override val serializedPlanOpt: SerializedPlan)
56-
extends CometLeafExec {
58+
extends CometLeafExec
59+
with DataSourceScanExec {
5760

58-
override def nodeName: String =
59-
s"${super.nodeName}: ${tableIdentifier.map(_.toString).getOrElse("")}"
61+
override lazy val metadata: Map[String, String] = originalPlan.metadata
6062

61-
override def outputPartitioning: Partitioning =
63+
override val nodeName: String =
64+
s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
65+
66+
override lazy val outputPartitioning: Partitioning =
6267
UnknownPartitioning(originalPlan.inputRDD.getNumPartitions)
6368

64-
override def outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering
69+
override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering
70+
71+
override def doCanonicalize(): CometNativeScanExec = {
72+
CometNativeScanExec(
73+
nativeOp,
74+
relation,
75+
output.map(QueryPlan.normalizeExpressions(_, output)),
76+
requiredSchema,
77+
QueryPlan.normalizePredicates(
78+
CometScanUtils.filterUnusedDynamicPruningExpressions(partitionFilters),
79+
output),
80+
optionalBucketSet,
81+
optionalNumCoalescedBuckets,
82+
QueryPlan.normalizePredicates(dataFilters, output),
83+
None,
84+
disableBucketedScan,
85+
originalPlan.doCanonicalize(),
86+
SerializedPlan(None))
87+
}
6588

6689
override def stringArgs: Iterator[Any] = Iterator(output)
6790

6891
override def equals(obj: Any): Boolean = {
6992
obj match {
7093
case other: CometNativeScanExec =>
71-
this.output == other.output &&
94+
this.originalPlan == other.originalPlan &&
7295
this.serializedPlanOpt == other.serializedPlanOpt
7396
case _ =>
7497
false
7598
}
7699
}
77100

78-
override def hashCode(): Int = Objects.hashCode(output)
101+
override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)
79102

80103
override lazy val metrics: Map[String, SQLMetric] = {
81104
// We don't append CometMetricNode.baselineMetrics because
@@ -153,6 +176,11 @@ case class CometNativeScanExec(
153176
sparkContext,
154177
"Time spent reading and parsing metadata from the footer"))
155178
}
179+
180+
/**
181+
* See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only used for tests.
182+
*/
183+
override def inputRDDs(): Seq[RDD[InternalRow]] = originalPlan.inputRDDs()
156184
}
157185

158186
object CometNativeScanExec extends DataTypeSupport {

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,20 +459,13 @@ case class CometScanExec(
459459
}
460460
}
461461

462-
// Filters unused DynamicPruningExpression expressions - one which has been replaced
463-
// with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
464-
private def filterUnusedDynamicPruningExpressions(
465-
predicates: Seq[Expression]): Seq[Expression] = {
466-
predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
467-
}
468-
469462
override def doCanonicalize(): CometScanExec = {
470463
CometScanExec(
471464
relation,
472465
output.map(QueryPlan.normalizeExpressions(_, output)),
473466
requiredSchema,
474467
QueryPlan.normalizePredicates(
475-
filterUnusedDynamicPruningExpressions(partitionFilters),
468+
CometScanUtils.filterUnusedDynamicPruningExpressions(partitionFilters),
476469
output),
477470
optionalBucketSet,
478471
optionalNumCoalescedBuckets,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet
21+
22+
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression, Literal}
23+
24+
object CometScanUtils {
25+
26+
/**
27+
* Filters unused DynamicPruningExpression expressions - one which has been replaced with
28+
* DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
29+
*/
30+
def filterUnusedDynamicPruningExpressions(predicates: Seq[Expression]): Seq[Expression] = {
31+
predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
32+
}
33+
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -813,11 +813,6 @@ class CometExecSuite extends CometTestBase {
813813
}
814814

815815
test("explain native plan") {
816-
// https://github.com/apache/datafusion-comet/issues/1441
817-
assume(!CometConf.isExperimentalNativeScan)
818-
// there are no assertions in this test to prove that the explain feature
819-
// wrote the expected output to stdout, but we at least test that enabling
820-
// the config does not cause any exceptions.
821816
withSQLConf(
822817
CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true",
823818
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {

0 commit comments

Comments
 (0)