Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit e02e063

Browse files
committed
Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"
This reverts commit 6a4e023.
1 parent 3dda682 commit e02e063

File tree

4 files changed

+1
-47
lines changed

4 files changed

+1
-47
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -552,12 +552,6 @@ object SQLConf {
552552
.booleanConf
553553
.createWithDefault(true)
554554

555-
val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
556-
.internal()
557-
.doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
558-
.booleanConf
559-
.createWithDefault(true)
560-
561555
val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
562556
buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
563557
.internal()
@@ -927,8 +921,6 @@ class SQLConf extends Serializable with Logging {
927921

928922
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
929923

930-
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
931-
932924
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
933925

934926
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,9 +599,6 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
599599
*/
600600
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
601601

602-
// Ignore this wrapper for canonicalizing.
603-
override lazy val canonicalized: SparkPlan = child.canonicalized
604-
605602
override lazy val metrics = Map(
606603
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
607604
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
156156
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
157157

158158
def apply(plan: SparkPlan): SparkPlan = {
159-
if (!conf.subqueryReuseEnabled) {
159+
if (!conf.exchangeReuseEnabled) {
160160
return plan
161161
}
162162
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ import java.net.{MalformedURLException, URL}
2323
import java.sql.Timestamp
2424
import java.util.concurrent.atomic.AtomicBoolean
2525

26-
import scala.collection.mutable.ArrayBuffer
27-
2826
import org.apache.spark.{AccumulatorSuite, SparkException}
2927
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3028
import org.apache.spark.sql.catalyst.util.StringUtils
31-
import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
3229
import org.apache.spark.sql.execution.aggregate
3330
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
3431
import org.apache.spark.sql.functions._
@@ -711,38 +708,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
711708
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
712709
}
713710

714-
test("Verify spark.sql.subquery.reuse") {
715-
Seq(true, false).foreach { reuse =>
716-
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
717-
val df = sql(
718-
"""
719-
|SELECT key, (SELECT avg(key) FROM testData)
720-
|FROM testData
721-
|WHERE key > (SELECT avg(key) FROM testData)
722-
|ORDER BY key
723-
|LIMIT 3
724-
""".stripMargin)
725-
726-
checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
727-
728-
val subqueries = ArrayBuffer.empty[SubqueryExec]
729-
df.queryExecution.executedPlan.transformAllExpressions {
730-
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
731-
subqueries += plan
732-
s
733-
}
734-
735-
assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
736-
737-
if (reuse) {
738-
assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
739-
} else {
740-
assert(subqueries.distinct.size == 2, "Reuse is not expected")
741-
}
742-
}
743-
}
744-
}
745-
746711
test("cartesian product join") {
747712
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
748713
checkAnswer(

0 commit comments

Comments
 (0)