Skip to content

Commit 68034a8

Browse files
dbaliafroozehhvanhovell
authored andcommitted
[SPARK-30072][SQL] Create dedicated planner for subqueries
### What changes were proposed in this pull request? This PR changes subquery planning by calling the planner and plan preparation rules on the subquery plan directly. Before we were creating a `QueryExecution` instance for subqueries to get the executedPlan. This would re-run analysis and optimization on the subqueries plan. Running the analysis again on an optimized query plan can have unwanted consequences, as some rules, for example `DecimalPrecision`, are not idempotent. As an example, consider the expression `1.7 * avg(a)` which after applying the `DecimalPrecision` rule becomes: ``` promote_precision(1.7) * promote_precision(avg(a)) ``` After the optimization, more specifically the constant folding rule, this expression becomes: ``` 1.7 * promote_precision(avg(a)) ``` Now if we run the analyzer on this optimized query again, we will get: ``` promote_precision(1.7) * promote_precision(promote_precision(avg(a))) ``` Which will later optimized as: ``` 1.7 * promote_precision(promote_precision(avg(a))) ``` As can be seen, re-running the analysis and optimization on this expression results in an expression with extra nested promote_preceision nodes. Adding unneeded nodes to the plan is problematic because it can eliminate situations where we can reuse the plan. We opted to introduce dedicated planners for subuqueries, instead of making the DecimalPrecision rule idempotent, because this eliminates this entire category of problems. Another benefit is that planning time for subqueries is reduced. ### How was this patch tested? Unit tests Closes apache#26705 from dbaliafroozeh/CreateDedicatedPlannerForSubqueries. Authored-by: Ali Afroozeh <[email protected]> Signed-off-by: herman <[email protected]>
1 parent e04a634 commit 68034a8

File tree

11 files changed

+134
-56
lines changed

11 files changed

+134
-56
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
246246
}
247247
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
248248
case s: SubqueryExpression =>
249-
val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
249+
val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s))
250250
// At this point we have an optimized subquery plan that we are going to attach
251251
// to this subquery expression. Here we can safely remove any top level sort
252252
// in the plan as tuples produced by a subquery are un-ordered.
@@ -377,8 +377,8 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
377377
plan match {
378378
// We want to keep the same output attributes for subqueries. This means we cannot remove
379379
// the aliases that produce these attributes
380-
case Subquery(child) =>
381-
Subquery(removeRedundantAliases(child, blacklist ++ child.outputSet))
380+
case Subquery(child, correlated) =>
381+
Subquery(removeRedundantAliases(child, blacklist ++ child.outputSet), correlated)
382382

383383
// A join has to be treated differently, because the left and the right side of the join are
384384
// not allowed to use the same attributes. We use a blacklist to prevent us from creating a

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,19 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
4545
/**
4646
* This node is inserted at the top of a subquery when it is optimized. This makes sure we can
4747
* recognize a subquery as such, and it allows us to write subquery aware transformations.
48+
*
49+
* @param correlated flag that indicates the subquery is correlated, and will be rewritten into a
50+
* join during analysis.
4851
*/
49-
case class Subquery(child: LogicalPlan) extends OrderPreservingUnaryNode {
52+
case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode {
5053
override def output: Seq[Attribute] = child.output
5154
}
5255

56+
object Subquery {
57+
def fromExpression(s: SubqueryExpression): Subquery =
58+
Subquery(s.plan, SubqueryExpression.hasCorrelatedSubquery(s))
59+
}
60+
5361
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
5462
extends OrderPreservingUnaryNode {
5563
override def output: Seq[Attribute] = projectList.map(_.toAttribute)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,13 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper
119119

120120
test("do not remove output attributes from a subquery") {
121121
val relation = LocalRelation('a.int, 'b.int)
122-
val query = Subquery(relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze)
122+
val query = Subquery(
123+
relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze,
124+
correlated = false)
123125
val optimized = Optimize.execute(query)
124-
val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze)
126+
val expected = Subquery(
127+
relation.select('a as "a", 'b).where('b < 10).select('a).analyze,
128+
correlated = false)
125129
comparePlans(optimized, expected)
126130
}
127131
}

sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
252252

253253
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
254254
// Do not rewrite subqueries.
255-
case _: Subquery => plan
255+
case s: Subquery if s.correlated => plan
256256
case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan
257257
case _ => prune(plan)
258258
}

sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,24 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
5555
plan transformAllExpressions {
5656
case DynamicPruningSubquery(
5757
value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
58-
val qe = new QueryExecution(sparkSession, buildPlan)
58+
val sparkPlan = QueryExecution.createSparkPlan(
59+
sparkSession, sparkSession.sessionState.planner, buildPlan)
5960
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
6061
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
6162
val canReuseExchange = reuseBroadcast && buildKeys.nonEmpty &&
6263
plan.find {
6364
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
64-
left.sameResult(qe.sparkPlan)
65+
left.sameResult(sparkPlan)
6566
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) =>
66-
right.sameResult(qe.sparkPlan)
67+
right.sameResult(sparkPlan)
6768
case _ => false
6869
}.isDefined
6970

7071
if (canReuseExchange) {
7172
val mode = broadcastMode(buildKeys, buildPlan)
73+
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
7274
// plan a broadcast exchange of the build side of the join
73-
val exchange = BroadcastExchangeExec(mode, qe.executedPlan)
75+
val exchange = BroadcastExchangeExec(mode, executedPlan)
7476
val name = s"dynamicpruning#${exprId.id}"
7577
// place the broadcast adaptor for reusing the broadcast results on the probe side
7678
val broadcastValues =

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,16 @@ class QueryExecution(
8181
}
8282

8383
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
84-
SparkSession.setActiveSession(sparkSession)
85-
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
86-
// but we will implement to choose the best plan.
8784
// Clone the logical plan here, in case the planner rules change the states of the logical plan.
88-
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
85+
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
8986
}
9087

9188
// executedPlan should not be used to initialize any SparkPlan. It should be
9289
// only used for execution.
9390
lazy val executedPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
9491
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
9592
// optimizing and planning.
96-
prepareForExecution(sparkPlan.clone())
93+
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
9794
}
9895

9996
/**
@@ -109,28 +106,10 @@ class QueryExecution(
109106
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
110107
executedPlan.execute(), sparkSession.sessionState.conf)
111108

112-
/**
113-
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
114-
* row format conversions as needed.
115-
*/
116-
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
117-
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
109+
protected def preparations: Seq[Rule[SparkPlan]] = {
110+
QueryExecution.preparations(sparkSession)
118111
}
119112

120-
/** A sequence of rules that will be applied in order to the physical plan before execution. */
121-
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
122-
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
123-
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
124-
InsertAdaptiveSparkPlan(sparkSession, this),
125-
PlanDynamicPruningFilters(sparkSession),
126-
PlanSubqueries(sparkSession),
127-
EnsureRequirements(sparkSession.sessionState.conf),
128-
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
129-
sparkSession.sessionState.columnarRules),
130-
CollapseCodegenStages(sparkSession.sessionState.conf),
131-
ReuseExchange(sparkSession.sessionState.conf),
132-
ReuseSubquery(sparkSession.sessionState.conf))
133-
134113
def simpleString: String = simpleString(false)
135114

136115
def simpleString(formatted: Boolean): String = withRedaction {
@@ -248,3 +227,67 @@ class QueryExecution(
248227
}
249228
}
250229
}
230+
231+
object QueryExecution {
232+
/**
233+
* Construct a sequence of rules that are used to prepare a planned [[SparkPlan]] for execution.
234+
* These rules will make sure subqueries are planned, make use the data partitioning and ordering
235+
* are correct, insert whole stage code gen, and try to reduce the work done by reusing exchanges
236+
* and subqueries.
237+
*/
238+
private[execution] def preparations(sparkSession: SparkSession): Seq[Rule[SparkPlan]] =
239+
Seq(
240+
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
241+
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
242+
InsertAdaptiveSparkPlan(sparkSession),
243+
PlanDynamicPruningFilters(sparkSession),
244+
PlanSubqueries(sparkSession),
245+
EnsureRequirements(sparkSession.sessionState.conf),
246+
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
247+
sparkSession.sessionState.columnarRules),
248+
CollapseCodegenStages(sparkSession.sessionState.conf),
249+
ReuseExchange(sparkSession.sessionState.conf),
250+
ReuseSubquery(sparkSession.sessionState.conf)
251+
)
252+
253+
/**
254+
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
255+
* row format conversions as needed.
256+
*/
257+
private[execution] def prepareForExecution(
258+
preparations: Seq[Rule[SparkPlan]],
259+
plan: SparkPlan): SparkPlan = {
260+
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
261+
}
262+
263+
/**
264+
* Transform a [[LogicalPlan]] into a [[SparkPlan]].
265+
*
266+
* Note that the returned physical plan still needs to be prepared for execution.
267+
*/
268+
def createSparkPlan(
269+
sparkSession: SparkSession,
270+
planner: SparkPlanner,
271+
plan: LogicalPlan): SparkPlan = {
272+
SparkSession.setActiveSession(sparkSession)
273+
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
274+
// but we will implement to choose the best plan.
275+
planner.plan(ReturnAnswer(plan)).next()
276+
}
277+
278+
/**
279+
* Prepare the [[SparkPlan]] for execution.
280+
*/
281+
def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {
282+
prepareForExecution(preparations(spark), plan)
283+
}
284+
285+
/**
286+
* Transform the subquery's [[LogicalPlan]] into a [[SparkPlan]] and prepare the resulting
287+
* [[SparkPlan]] for execution.
288+
*/
289+
def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {
290+
val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())
291+
prepareExecutedPlan(spark, sparkPlan)
292+
}
293+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ case class AdaptiveSparkPlanExec(
6363
@transient preprocessingRules: Seq[Rule[SparkPlan]],
6464
@transient subqueryCache: TrieMap[SparkPlan, BaseSubqueryExec],
6565
@transient stageCache: TrieMap[SparkPlan, QueryStageExec],
66-
@transient queryExecution: QueryExecution)
66+
@transient isSubquery: Boolean)
6767
extends LeafExecNode {
6868

6969
@transient private val lock = new Object()
@@ -128,14 +128,12 @@ case class AdaptiveSparkPlanExec(
128128

129129
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
130130
if (!isFinalPlan) {
131-
// Make sure we only update Spark UI if this plan's `QueryExecution` object matches the one
132-
// retrieved by the `sparkContext`'s current execution ID. Note that sub-queries do not have
133-
// their own execution IDs and therefore rely on the main query to update UI.
134-
val executionId = Option(
135-
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).flatMap { idStr =>
136-
val id = idStr.toLong
137-
val qe = SQLExecution.getQueryExecution(id)
138-
if (qe.eq(queryExecution)) Some(id) else None
131+
// Subqueries do not have their own execution IDs and therefore rely on the main query to
132+
// update UI.
133+
val executionId = if (isSubquery) {
134+
None
135+
} else {
136+
Option(session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
139137
}
140138
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
141139
var result = createQueryStages(currentPhysicalPlan)

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
3535
*
3636
* Note that this rule is stateful and thus should not be reused across query executions.
3737
*/
38-
case class InsertAdaptiveSparkPlan(
39-
session: SparkSession,
40-
queryExecution: QueryExecution) extends Rule[SparkPlan] {
38+
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {
4139

4240
private val conf = session.sessionState.conf
4341

@@ -47,9 +45,9 @@ case class InsertAdaptiveSparkPlan(
4745
// Exchange-reuse is shared across the entire query, including sub-queries.
4846
private val stageCache = new TrieMap[SparkPlan, QueryStageExec]()
4947

50-
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, queryExecution)
48+
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
5149

52-
private def applyInternal(plan: SparkPlan, qe: QueryExecution): SparkPlan = plan match {
50+
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
5351
case _: ExecutedCommandExec => plan
5452
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) =>
5553
try {
@@ -62,7 +60,8 @@ case class InsertAdaptiveSparkPlan(
6260
// Run pre-processing rules.
6361
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
6462
logDebug(s"Adaptive execution enabled for plan: $plan")
65-
AdaptiveSparkPlanExec(newPlan, session, preprocessingRules, subqueryCache, stageCache, qe)
63+
AdaptiveSparkPlanExec(newPlan, session, preprocessingRules,
64+
subqueryCache, stageCache, isSubquery)
6665
} catch {
6766
case SubqueryAdaptiveNotSupportedException(subquery) =>
6867
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
@@ -110,10 +109,10 @@ case class InsertAdaptiveSparkPlan(
110109
}
111110

112111
def compileSubquery(plan: LogicalPlan): SparkPlan = {
113-
val queryExec = new QueryExecution(session, plan)
114112
// Apply the same instance of this rule to sub-queries so that sub-queries all share the
115113
// same `stageCache` for Exchange reuse.
116-
this.applyInternal(queryExec.sparkPlan, queryExec)
114+
this.applyInternal(
115+
QueryExecution.createSparkPlan(session, session.sessionState.planner, plan.clone()), true)
117116
}
118117

119118
private def verifyAdaptivePlan(plan: SparkPlan, logicalPlan: LogicalPlan): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
205205
def apply(plan: LogicalPlan): LogicalPlan = plan match {
206206
// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule
207207
// eventually. Here we skip subquery, as Python UDF only needs to be extracted once.
208-
case _: Subquery => plan
208+
case s: Subquery if s.correlated => plan
209209

210210
case _ => plan transformUp {
211211
// A safe guard. `ExtractPythonUDFs` only runs once, so we will not hit `BatchEvalPython` and

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
178178
def apply(plan: SparkPlan): SparkPlan = {
179179
plan.transformAllExpressions {
180180
case subquery: expressions.ScalarSubquery =>
181-
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
181+
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, subquery.plan)
182182
ScalarSubquery(
183183
SubqueryExec(s"scalar-subquery#${subquery.exprId.id}", executedPlan),
184184
subquery.exprId)
@@ -192,7 +192,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
192192
}
193193
)
194194
}
195-
val executedPlan = new QueryExecution(sparkSession, query).executedPlan
195+
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query)
196196
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
197197
}
198198
}

0 commit comments

Comments
 (0)