Skip to content

Commit 15de6d0

Browse files
yeshengmgatorsmile
authored andcommitted
[SPARK-28096][SQL] Convert defs to lazy vals to avoid expensive reference computation in QueryPlan and Expression
## What changes were proposed in this pull request? The original `references` and `validConstraints` implementations in a few `QueryPlan` and `Expression` classes are methods, which means unnecessary re-computation can happen at times. This PR resolves this problem by making these method `lazy val`s. As shown in the following chart, the planning time(without cost-based optimization) was dramatically reduced after this optimization. - The average planning time of TPC-DS queries was reduced by 19.63%. - The planning time of the most time-consuming TPC-DS query (q64) was reduced by 43.03%. - The running time for rule-based reordering joins(not cost-based join reordering) optimization, which are common in real-world OLAP queries, was largely reduced. ![chart](https://user-images.githubusercontent.com/12269969/59721493-536a1200-91d6-11e9-9bfb-d7cb1e841a86.png) Detailed stats are listed in the following spreadsheet (we warmed up the queries 5 iterations and then took average of the next 5 iterations). [Lazy val benchmark.xlsx](https://github.com/apache/spark/files/3303530/Lazy.val.benchmark.xlsx) ## How was this patch tested? Existing UTs. Closes apache#24866 from yeshengm/plannode-micro-opt. Authored-by: Yesheng Ma <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent a5dcb82 commit 15de6d0

File tree

14 files changed

+51
-26
lines changed

14 files changed

+51
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,14 @@ abstract class Expression extends TreeNode[Expression] {
113113

114114
def nullable: Boolean
115115

116-
def references: AttributeSet = AttributeSet.fromAttributeSets(children.map(_.references))
116+
/**
117+
* Workaround scala compiler so that we can call super on lazy vals
118+
*/
119+
@transient
120+
private lazy val _references: AttributeSet =
121+
AttributeSet.fromAttributeSets(children.map(_.references))
122+
123+
def references: AttributeSet = _references
117124

118125
/** Returns the result of evaluating this expression on a given input Row */
119126
def eval(input: InternalRow = null): Any

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ case class AggregateExpression(
127127
override def foldable: Boolean = false
128128
override def nullable: Boolean = aggregateFunction.nullable
129129

130-
override def references: AttributeSet = {
130+
@transient
131+
override lazy val references: AttributeSet = {
131132
mode match {
132133
case Partial | Complete => aggregateFunction.references
133134
case PartialMerge | Final => AttributeSet(aggregateFunction.aggBufferAttributes)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ case class Rollup(groupByExprs: Seq[Expression]) extends GroupingSet {}
9898
since = "2.0.0")
9999
// scalastyle:on line.size.limit
100100
case class Grouping(child: Expression) extends Expression with Unevaluable {
101-
override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
101+
@transient
102+
override lazy val references: AttributeSet =
103+
AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
102104
override def children: Seq[Expression] = child :: Nil
103105
override def dataType: DataType = ByteType
104106
override def nullable: Boolean = false
@@ -133,7 +135,9 @@ case class Grouping(child: Expression) extends Expression with Unevaluable {
133135
since = "2.0.0")
134136
// scalastyle:on line.size.limit
135137
case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable {
136-
override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
138+
@transient
139+
override lazy val references: AttributeSet =
140+
AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
137141
override def children: Seq[Expression] = groupByExprs
138142
override def dataType: DataType = IntegerType
139143
override def nullable: Boolean = false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ trait NamedExpression extends Expression {
109109

110110
abstract class Attribute extends LeafExpression with NamedExpression with NullIntolerant {
111111

112-
override def references: AttributeSet = AttributeSet(this)
112+
@transient
113+
override lazy val references: AttributeSet = AttributeSet(this)
113114

114115
def withNullability(newNullability: Boolean): Attribute
115116
def withQualifier(newQualifier: Seq[String]): Attribute

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
3737
/**
3838
* Returns the set of attributes that are output by this node.
3939
*/
40-
def outputSet: AttributeSet = AttributeSet(output)
40+
@transient
41+
lazy val outputSet: AttributeSet = AttributeSet(output)
4142

4243
/**
4344
* All Attributes that appear in expressions from this operator. Note that this set does not
4445
* include attributes that are implicitly referenced by being passed through to the output tuple.
4546
*/
46-
def references: AttributeSet = AttributeSet.fromAttributeSets(expressions.map(_.references))
47+
@transient
48+
lazy val references: AttributeSet =
49+
AttributeSet.fromAttributeSets(expressions.map(_.references))
4750

4851
/**
4952
* The set of all attributes that are input to this operator by its children.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ abstract class UnaryNode extends LogicalPlan {
187187
allConstraints
188188
}
189189

190-
override protected def validConstraints: Set[Expression] = child.constraints
190+
override protected lazy val validConstraints: Set[Expression] = child.constraints
191191
}
192192

193193
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ trait QueryPlanConstraints extends ConstraintHelper { self: LogicalPlan =>
5050
*
5151
* See [[Canonicalize]] for more details.
5252
*/
53-
protected def validConstraints: Set[Expression] = Set.empty
53+
protected lazy val validConstraints: Set[Expression] = Set.empty
5454
}
5555

5656
trait ConstraintHelper {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ case class ScriptTransformation(
3333
output: Seq[Attribute],
3434
child: LogicalPlan,
3535
ioschema: ScriptInputOutputSchema) extends UnaryNode {
36-
override def references: AttributeSet = AttributeSet(input.flatMap(_.references))
36+
@transient
37+
override lazy val references: AttributeSet = AttributeSet(input.flatMap(_.references))
3738
}
3839

3940
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
6565
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
6666
}
6767

68-
override def validConstraints: Set[Expression] =
68+
override lazy val validConstraints: Set[Expression] =
6969
getAllValidConstraints(projectList)
7070
}
7171

@@ -133,7 +133,7 @@ case class Filter(condition: Expression, child: LogicalPlan)
133133

134134
override def maxRows: Option[Long] = child.maxRows
135135

136-
override protected def validConstraints: Set[Expression] = {
136+
override protected lazy val validConstraints: Set[Expression] = {
137137
val predicates = splitConjunctivePredicates(condition)
138138
.filterNot(SubqueryExpression.hasCorrelatedSubquery)
139139
child.constraints.union(predicates.toSet)
@@ -178,7 +178,7 @@ case class Intersect(
178178
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
179179
}
180180

181-
override protected def validConstraints: Set[Expression] =
181+
override protected lazy val validConstraints: Set[Expression] =
182182
leftConstraints.union(rightConstraints)
183183

184184
override def maxRows: Option[Long] = {
@@ -198,7 +198,7 @@ case class Except(
198198
/** We don't use right.output because those rows get excluded from the set. */
199199
override def output: Seq[Attribute] = left.output
200200

201-
override protected def validConstraints: Set[Expression] = leftConstraints
201+
override protected lazy val validConstraints: Set[Expression] = leftConstraints
202202
}
203203

204204
/** Factory for constructing new `Union` nodes. */
@@ -294,7 +294,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
294294
common ++ others
295295
}
296296

297-
override protected def validConstraints: Set[Expression] = {
297+
override protected lazy val validConstraints: Set[Expression] = {
298298
children
299299
.map(child => rewriteConstraints(children.head.output, child.output, child.constraints))
300300
.reduce(merge(_, _))
@@ -326,7 +326,7 @@ case class Join(
326326
}
327327
}
328328

329-
override protected def validConstraints: Set[Expression] = {
329+
override protected lazy val validConstraints: Set[Expression] = {
330330
joinType match {
331331
case _: InnerLike if condition.isDefined =>
332332
left.constraints
@@ -729,7 +729,7 @@ case class Aggregate(
729729
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
730730
override def maxRows: Option[Long] = child.maxRows
731731

732-
override def validConstraints: Set[Expression] = {
732+
override lazy val validConstraints: Set[Expression] = {
733733
val nonAgg = aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
734734
getAllValidConstraints(nonAgg)
735735
}
@@ -828,12 +828,13 @@ case class Expand(
828828
projections: Seq[Seq[Expression]],
829829
output: Seq[Attribute],
830830
child: LogicalPlan) extends UnaryNode {
831-
override def references: AttributeSet =
831+
@transient
832+
override lazy val references: AttributeSet =
832833
AttributeSet(projections.flatten.flatMap(_.references))
833834

834835
// This operator can reuse attributes (for example making them null when doing a roll up) so
835836
// the constraints of the child may no longer be valid.
836-
override protected def validConstraints: Set[Expression] = Set.empty[Expression]
837+
override protected lazy val validConstraints: Set[Expression] = Set.empty[Expression]
837838
}
838839

839840
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ trait ObjectConsumer extends UnaryNode {
6767
assert(child.output.length == 1)
6868

6969
// This operator always need all columns of its child, even it doesn't reference to.
70-
override def references: AttributeSet = child.outputSet
70+
@transient
71+
override lazy val references: AttributeSet = child.outputSet
7172

7273
def inputObjAttr: Attribute = child.output.head
7374
}
@@ -174,7 +175,8 @@ case class MapPartitionsInRWithArrow(
174175
output: Seq[Attribute],
175176
child: LogicalPlan) extends UnaryNode {
176177
// This operator always need all columns of its child, even it doesn't reference to.
177-
override def references: AttributeSet = child.outputSet
178+
@transient
179+
override lazy val references: AttributeSet = child.outputSet
178180

179181
override protected def stringArgs: Iterator[Any] = Iterator(
180182
inputSchema, StructType.fromAttributes(output), child)
@@ -528,7 +530,8 @@ case class FlatMapGroupsInRWithArrow(
528530
groupingAttributes: Seq[Attribute],
529531
child: LogicalPlan) extends UnaryNode {
530532
// This operator always need all columns of its child, even it doesn't reference to.
531-
override def references: AttributeSet = child.outputSet
533+
@transient
534+
override lazy val references: AttributeSet = child.outputSet
532535

533536
override protected def stringArgs: Iterator[Any] = Iterator(
534537
inputSchema, StructType.fromAttributes(output), keyDeserializer, groupingAttributes, child)

0 commit comments

Comments
 (0)