Skip to content

Commit a51fa9c

Browse files
j-essebulldozer-bot[bot]
authored andcommitted
Maximum repeatedly substituted alias size (apache-spark-on-k8s#475)
https://issues.apache.org/jira/browse/SPARK-26626 apache#23556 ## What changes were proposed in this pull request? This adds a `spark.sql.maxRepeatedAliasSize` config option, which specifies the maximum size of an aliased expression to be substituted (in CollapseProject and PhysicalOperation). This prevents large aliased expressions from being substituted multiple times and exploding the size of the expression tree, eventually OOMing the driver. The default config value of 100 was chosen through testing to find the optimally performant value: ![image](https://user-images.githubusercontent.com/17480705/51204201-dd285300-18b7-11e9-8781-dd698df00389.png) ## How was this patch tested? Added unit tests, and did manual testing
1 parent 8a4a29b commit a51fa9c

File tree

6 files changed

+87
-4
lines changed

6 files changed

+87
-4
lines changed

FORK.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
* core: Broadcast, CoarseGrainedExecutorBackend, CoarseGrainedSchedulerBackend, Executor, MemoryStore, SparkContext, TorrentBroadcast
1919
* kubernetes: ExecutorPodsAllocator, ExecutorPodsLifecycleManager, ExecutorPodsPollingSnapshotSource, ExecutorPodsSnapshot, ExecutorPodsWatchSnapshotSource, KubernetesClusterSchedulerBackend
2020
* yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend
21+
22+
* [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases
23+
2124
# Added
2225

2326
* Gradle plugin to easily create custom docker images for use with k8s
2427
* Filter rLibDir by exists so that daemon.R references the correct file [460](https://github.com/palantir/spark/pull/460)
2528

2629
# Reverted
2730
* [SPARK-25908](https://issues.apache.org/jira/browse/SPARK-25908) - Removal of `monotonicall_increasing_id`, `toDegree`, `toRadians`, `approxCountDistinct`, `unionAll`
28-
* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow`
31+
* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow`

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,8 @@ object CollapseProject extends Rule[LogicalPlan] {
649649

650650
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
651651
case p1 @ Project(_, p2: Project) =>
652-
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) {
652+
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
653+
hasOversizedRepeatedAliases(p1.projectList, p2.projectList)) {
653654
p1
654655
} else {
655656
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
@@ -682,6 +683,28 @@ object CollapseProject extends Rule[LogicalPlan] {
682683
}.exists(!_.deterministic))
683684
}
684685

686+
private def hasOversizedRepeatedAliases(
687+
upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = {
688+
val aliases = collectAliases(lower)
689+
690+
// Count how many times each alias is used in the upper Project.
691+
// If an alias is only used once, we can safely substitute it without increasing the overall
692+
// tree size
693+
val referenceCounts = AttributeMap(
694+
upper
695+
.flatMap(_.collect { case a: Attribute => a })
696+
.groupBy(identity)
697+
.mapValues(_.size).toSeq
698+
)
699+
700+
// Check for any aliases that are used more than once, and are larger than the configured
701+
// maximum size
702+
aliases.exists({ case (attribute, expression) =>
703+
referenceCounts.getOrElse(attribute, 0) > 1 &&
704+
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
705+
})
706+
}
707+
685708
private def buildCleanedProjectList(
686709
upper: Seq[NamedExpression],
687710
lower: Seq[NamedExpression]): Seq[NamedExpression] = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2424
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.catalyst.plans.logical._
26+
import org.apache.spark.sql.internal.SQLConf
2627

2728
/**
2829
* A pattern that matches any number of project or filter operations on top of another relational
@@ -58,8 +59,13 @@ object PhysicalOperation extends PredicateHelper {
5859
plan match {
5960
case Project(fields, child) if fields.forall(_.deterministic) =>
6061
val (_, filters, other, aliases) = collectProjectsAndFilters(child)
61-
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
62-
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
62+
if (hasOversizedRepeatedAliases(fields, aliases)) {
63+
// Skip substitution if it could overly increase the overall tree size and risk OOMs
64+
(None, Nil, plan, Map.empty)
65+
} else {
66+
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
67+
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
68+
}
6369

6470
case Filter(condition, child) if condition.deterministic =>
6571
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
@@ -77,6 +83,26 @@ object PhysicalOperation extends PredicateHelper {
7783
case a @ Alias(child, _) => a.toAttribute -> child
7884
}.toMap
7985

86+
private def hasOversizedRepeatedAliases(fields: Seq[Expression],
87+
aliases: Map[Attribute, Expression]): Boolean = {
88+
// Count how many times each alias is used in the fields.
89+
// If an alias is only used once, we can safely substitute it without increasing the overall
90+
// tree size
91+
val referenceCounts = AttributeMap(
92+
fields
93+
.flatMap(_.collect { case a: Attribute => a })
94+
.groupBy(identity)
95+
.mapValues(_.size).toSeq
96+
)
97+
98+
// Check for any aliases that are used more than once, and are larger than the configured
99+
// maximum size
100+
aliases.exists({ case (attribute, expression) =>
101+
referenceCounts.getOrElse(attribute, 0) > 1 &&
102+
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
103+
})
104+
}
105+
80106
private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = {
81107
expr.transform {
82108
case a @ Alias(ref: AttributeReference, name) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
8787

8888
lazy val containsChild: Set[TreeNode[_]] = children.toSet
8989

90+
lazy val treeSize: Long = children.map(_.treeSize).sum + 1
91+
9092
private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this)
9193
override def hashCode(): Int = _hashCode
9294

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,6 +1610,15 @@ object SQLConf {
16101610
"WHERE, which does not follow SQL standard.")
16111611
.booleanConf
16121612
.createWithDefault(false)
1613+
1614+
val MAX_REPEATED_ALIAS_SIZE =
1615+
buildConf("spark.sql.maxRepeatedAliasSize")
1616+
.internal()
1617+
.doc("The maximum size of alias expression that will be substituted multiple times " +
1618+
"(size defined by the number of nodes in the expression tree). " +
1619+
"Used by the CollapseProject optimizer, and PhysicalOperation.")
1620+
.intConf
1621+
.createWithDefault(100)
16131622
}
16141623

16151624
/**
@@ -2038,6 +2047,8 @@ class SQLConf extends Serializable with Logging {
20382047

20392048
def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)
20402049

2050+
def maxRepeatedAliasSize: Int = getConf(SQLConf.MAX_REPEATED_ALIAS_SIZE)
2051+
20412052
/** ********************** SQLConf functionality methods ************ */
20422053

20432054
/** Set Spark SQL configuration properties. */

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,22 @@ class CollapseProjectSuite extends PlanTest {
138138
assert(projects.size === 1)
139139
assert(hasMetadata(optimized))
140140
}
141+
142+
test("ensure oversize aliases are not repeatedly substituted") {
143+
var query: LogicalPlan = testRelation
144+
for( a <- 1 to 100) {
145+
query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
146+
}
147+
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
148+
assert(projects.size >= 12)
149+
}
150+
151+
test("ensure oversize aliases are still substituted once") {
152+
var query: LogicalPlan = testRelation
153+
for( a <- 1 to 20) {
154+
query = query.select(('a + 'b).as('a), 'b)
155+
}
156+
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
157+
assert(projects.size === 1)
158+
}
141159
}

0 commit comments

Comments
 (0)