Skip to content

Commit 80929d6

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-38832][SQL][FOLLOWUP] Support propagate empty expression set for distinct key
### What changes were proposed in this pull request? - Improve `DistinctKeyVisitor` that support propagate empty set - Small improvement for match alias ### Why are the changes needed? Make distinct keys can be used to optimize more case, see comment #36117 (comment) ### Does this PR introduce _any_ user-facing change? Improve performance ### How was this patch tested? add test Closes #36281 from ulysses-you/SPARK-38832-followup. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c83618e commit 80929d6

File tree

3 files changed

+17
-22
lines changed

3 files changed

+17
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,21 @@ object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
2929
private def projectDistinctKeys(
3030
keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
3131
val outputSet = ExpressionSet(projectList.map(_.toAttribute))
32-
val aliases = projectList.filter(_.isInstanceOf[Alias])
32+
val aliases = projectList.collect {
33+
// TODO: Expand distinctKeys for redundant aliases on the same expression
34+
case alias: Alias if alias.child.deterministic => alias.child.canonicalized -> alias
35+
}.toMap
3336
if (aliases.isEmpty) {
3437
keys.filter(_.subsetOf(outputSet))
3538
} else {
36-
val aliasedDistinctKeys = keys.map { expressionSet =>
37-
expressionSet.map { expression =>
38-
expression transform {
39-
case expr: Expression =>
40-
// TODO: Expand distinctKeys for redundant aliases on the same expression
41-
aliases
42-
.collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
43-
.getOrElse(expr)
44-
}
45-
}
46-
}
39+
val aliasedDistinctKeys = keys.map(_.map(_.transform {
40+
case expr: Expression =>
41+
aliases.get(expr.canonicalized).map(_.toAttribute).getOrElse(expr)
42+
}))
4743
aliasedDistinctKeys.collect {
4844
case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
4945
} ++ keys.filter(_.subsetOf(outputSet))
50-
}.filter(_.nonEmpty)
46+
}
5147
}
5248

5349
/**
@@ -69,7 +65,8 @@ object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
6965
override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
7066

7167
override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
72-
val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
68+
// handle group by a, a and global aggregate
69+
val groupingExps = ExpressionSet(p.groupingExpressions)
7370
projectDistinctKeys(addDistinctKey(p.child.distinctKeys, groupingExps), p.aggregateExpressions)
7471
}
7572

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,6 @@ import org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED
2929
*/
3030
trait LogicalPlanDistinctKeys { self: LogicalPlan =>
3131
lazy val distinctKeys: Set[ExpressionSet] = {
32-
if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) {
33-
val keys = DistinctKeyVisitor.visit(self)
34-
require(keys.forall(_.nonEmpty))
35-
keys
36-
} else {
37-
Set.empty
38-
}
32+
if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty
3933
}
4034
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ class DistinctKeyVisitorSuite extends PlanTest {
6161
checkDistinctAttributes(t1.groupBy($"a")($"a", max($"b")), Set(ExpressionSet(Seq(a))))
6262
checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", d, e),
6363
Set(ExpressionSet(Seq(a, b)), ExpressionSet(Seq(d.toAttribute, e.toAttribute))))
64-
checkDistinctAttributes(t1.groupBy()(sum($"c")), Set.empty)
64+
checkDistinctAttributes(t1.groupBy()(sum($"c")), Set(ExpressionSet()))
65+
// ExpressionSet() is a subset of anything, so we do not need ExpressionSet(c2)
66+
checkDistinctAttributes(t1.groupBy()(sum($"c") as "c2").groupBy($"c2")("c2"),
67+
Set(ExpressionSet()))
68+
checkDistinctAttributes(t1.groupBy()(), Set(ExpressionSet()))
6569
checkDistinctAttributes(t1.groupBy($"a")($"a", $"a" % 10, d, sum($"b")),
6670
Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(d.toAttribute))))
6771
checkDistinctAttributes(t1.groupBy(f.child, $"b")(f, $"b", sum($"c")),

0 commit comments

Comments
 (0)