Skip to content

Commit 43da473

Browse files
maryannxuecloud-fan
andcommitted
[SPARK-27225][SQL] Implement join strategy hints
## What changes were proposed in this pull request? This PR extends the existing BROADCAST join hint (for both broadcast-hash join and broadcast-nested-loop join) by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. The hint names: SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL are partly different from the code names in order to make them clearer to users and reflect the actual algorithms better. The hinted strategy will be used for the join with which it is associated if it is applicable/doable. Conflict resolving rules in case of multiple hints: 1. Conflicts within either side of the join: take the first strategy hint specified in the query, or the top hint node in Dataset. For example, in "select /*+ merge(t1) */ /*+ broadcast(t1) */ k1, v2 from t1 join t2 on t1.k1 = t2.k2", take "merge(t1)"; in ```df1.hint("merge").hint("shuffle_hash").join(df2)```, take "shuffle_hash". This is a general hint conflict resolving strategy, not specific to join strategy hint. 2. Conflicts between two sides of the join: a) In case of different strategy hints, hints are prioritized as ```BROADCAST``` over ```SHUFFLE_MERGE``` over ```SHUFFLE_HASH``` over ```SHUFFLE_REPLICATE_NL```. b) In case of same strategy hints but conflicts in build side, choose the build side based on join type and size. ## How was this patch tested? Added new UTs. Closes apache#24164 from maryannxue/join-hints. Lead-authored-by: maryannxue <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 239082d commit 43da473

File tree

12 files changed

+837
-252
lines changed

12 files changed

+837
-252
lines changed

docs/sql-performance-tuning.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,22 @@ that these options will be deprecated in future release as more optimizations ar
107107
</tr>
108108
</table>
109109

110-
## Broadcast Hint for SQL Queries
111-
112-
The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view.
113-
When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred,
114-
even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
115-
When both sides of a join are specified, Spark broadcasts the one having the lower statistics.
116-
Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join)
117-
support BHJ. When the broadcast nested loop join is selected, we still respect the hint.
110+
## Join Strategy Hints for SQL Queries
111+
112+
The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`,
113+
instruct Spark to use the hinted strategy on each specified relation when joining them with another
114+
relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either
115+
broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
116+
with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
117+
by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
118+
119+
When different join strategy hints are specified on both sides of a join, Spark prioritizes the
120+
`BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL`
121+
hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will
122+
pick the build side based on the join type and the sizes of the relations.
123+
124+
Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
125+
a specific strategy may not support all join types.
118126

119127
<div class="codetabs">
120128

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class Analyzer(
153153

154154
lazy val batches: Seq[Batch] = Seq(
155155
Batch("Hints", fixedPoint,
156-
new ResolveHints.ResolveBroadcastHints(conf),
156+
new ResolveHints.ResolveJoinStrategyHints(conf),
157157
ResolveHints.ResolveCoalesceHints,
158158
ResolveHints.RemoveAllHints),
159159
Batch("Simple Sanity Check", Once,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import java.util.Locale
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.AnalysisException
2325
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
2426
import org.apache.spark.sql.catalyst.plans.logical._
@@ -28,45 +30,66 @@ import org.apache.spark.sql.internal.SQLConf
2830

2931

3032
/**
31-
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
33+
* Collection of rules related to hints. The only hint currently available is join strategy hint.
3234
*
3335
* Note that this is separately into two rules because in the future we might introduce new hint
34-
* rules that have different ordering requirements from broadcast.
36+
* rules that have different ordering requirements from join strategies.
3537
*/
3638
object ResolveHints {
3739

3840
/**
39-
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
40-
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
41-
* on top of any relation (that is not aliased differently), subquery, or common table expression
42-
* that match the specified name.
41+
* The list of allowed join strategy hints is defined in [[JoinStrategyHint.strategies]], and a
42+
* sequence of relation aliases can be specified with a join strategy hint, e.g., "MERGE(a, c)",
43+
* "BROADCAST(a)". A join strategy hint plan node will be inserted on top of any relation (that
44+
* is not aliased differently), subquery, or common table expression that match the specified
45+
* name.
4346
*
4447
* The hint resolution works by recursively traversing down the query plan to find a relation or
45-
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
46-
* beyond any existing broadcast hints, subquery aliases.
48+
* subquery that matches one of the specified relation aliases. The traversal does not go past
49+
* beyond any view reference, with clause or subquery alias.
4750
*
4851
* This rule must happen before common table expressions.
4952
*/
50-
class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
51-
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
53+
class ResolveJoinStrategyHints(conf: SQLConf) extends Rule[LogicalPlan] {
54+
private val STRATEGY_HINT_NAMES = JoinStrategyHint.strategies.flatMap(_.hintAliases)
5255

5356
def resolver: Resolver = conf.resolver
5457

55-
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
58+
private def createHintInfo(hintName: String): HintInfo = {
59+
HintInfo(strategy =
60+
JoinStrategyHint.strategies.find(
61+
_.hintAliases.map(
62+
_.toUpperCase(Locale.ROOT)).contains(hintName.toUpperCase(Locale.ROOT))))
63+
}
64+
65+
private def applyJoinStrategyHint(
66+
plan: LogicalPlan,
67+
relations: mutable.HashSet[String],
68+
hintName: String): LogicalPlan = {
5669
// Whether to continue recursing down the tree
5770
var recurse = true
5871

5972
val newNode = CurrentOrigin.withOrigin(plan.origin) {
6073
plan match {
61-
case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
62-
ResolvedHint(plan, HintInfo(broadcast = true))
63-
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
64-
ResolvedHint(plan, HintInfo(broadcast = true))
74+
case ResolvedHint(u: UnresolvedRelation, hint)
75+
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
76+
relations.remove(u.tableIdentifier.table)
77+
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
78+
case ResolvedHint(r: SubqueryAlias, hint)
79+
if relations.exists(resolver(_, r.alias)) =>
80+
relations.remove(r.alias)
81+
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
82+
83+
case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
84+
relations.remove(u.tableIdentifier.table)
85+
ResolvedHint(plan, createHintInfo(hintName))
86+
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
87+
relations.remove(r.alias)
88+
ResolvedHint(plan, createHintInfo(hintName))
6589

6690
case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
6791
// Don't traverse down these nodes.
68-
// For an existing broadcast hint, there is no point going down (if we do, we either
69-
// won't change the structure, or will introduce another broadcast hint that is useless.
92+
// For an existing strategy hint, there is no chance for a match from this point down.
7093
// The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
7194
// down. Note that technically when this rule is executed, we haven't completed view
7295
// resolution yet and as a result the view part should be deadcode. I'm leaving it here
@@ -80,25 +103,38 @@ object ResolveHints {
80103
}
81104

82105
if ((plan fastEquals newNode) && recurse) {
83-
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
106+
newNode.mapChildren(child => applyJoinStrategyHint(child, relations, hintName))
84107
} else {
85108
newNode
86109
}
87110
}
88111

112+
private def handleOverriddenHintInfo(hint: HintInfo): Unit = {
113+
logWarning(s"Join hint $hint is overridden by another hint and will not take effect.")
114+
}
115+
89116
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
90-
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
117+
case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
91118
if (h.parameters.isEmpty) {
92-
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
93-
ResolvedHint(h.child, HintInfo(broadcast = true))
119+
// If there is no table alias specified, apply the hint on the entire subtree.
120+
ResolvedHint(h.child, createHintInfo(h.name))
94121
} else {
95-
// Otherwise, find within the subtree query plans that should be broadcasted.
96-
applyBroadcastHint(h.child, h.parameters.map {
122+
// Otherwise, find within the subtree query plans to apply the hint.
123+
val relationNames = h.parameters.map {
97124
case tableName: String => tableName
98125
case tableId: UnresolvedAttribute => tableId.name
99-
case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
100-
s"an identifier or string but was $unsupported (${unsupported.getClass}")
101-
}.toSet)
126+
case unsupported => throw new AnalysisException("Join strategy hint parameter " +
127+
s"should be an identifier or string but was $unsupported (${unsupported.getClass}")
128+
}
129+
val relationNameSet = new mutable.HashSet[String]
130+
relationNames.foreach(relationNameSet.add)
131+
132+
val applied = applyJoinStrategyHint(h.child, relationNameSet, h.name)
133+
relationNameSet.foreach { n =>
134+
logWarning(s"Count not find relation '$n' for join strategy hint " +
135+
s"'${h.name}${relationNames.mkString("(", ", ", ")")}'.")
136+
}
137+
applied
102138
}
103139
}
104140
}
@@ -135,7 +171,9 @@ object ResolveHints {
135171
*/
136172
object RemoveAllHints extends Rule[LogicalPlan] {
137173
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
138-
case h: UnresolvedHint => h.child
174+
case h: UnresolvedHint =>
175+
logWarning(s"Unrecognized hint: ${h.name}${h.parameters.mkString("(", ", ", ")")}")
176+
h.child
139177
}
140178
}
141179

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ object CatalogTable {
374374
/**
375375
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
376376
* We define this new class instead of directly using [[Statistics]] here because there are no
377-
* concepts of attributes or broadcast hint in catalog.
377+
* concepts of attributes in catalog.
378378
*/
379379
case class CatalogStatistics(
380380
sizeInBytes: BigInt,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,58 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
3030
def apply(plan: LogicalPlan): LogicalPlan = {
3131
val pulledUp = plan transformUp {
3232
case j: Join =>
33-
val leftHint = mergeHints(collectHints(j.left))
34-
val rightHint = mergeHints(collectHints(j.right))
35-
j.copy(hint = JoinHint(leftHint, rightHint))
33+
val (newLeft, leftHints) = extractHintsFromPlan(j.left)
34+
val (newRight, rightHints) = extractHintsFromPlan(j.right)
35+
val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
36+
j.copy(left = newLeft, right = newRight, hint = newJoinHint)
3637
}
3738
pulledUp.transformUp {
38-
case h: ResolvedHint => h.child
39+
case h: ResolvedHint =>
40+
handleInvalidHintInfo(h.hints)
41+
h.child
3942
}
4043
}
4144

45+
/**
46+
* Combine a list of [[HintInfo]]s into one [[HintInfo]].
47+
*/
4248
private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
43-
hints.reduceOption((h1, h2) => HintInfo(
44-
broadcast = h1.broadcast || h2.broadcast))
49+
hints.reduceOption((h1, h2) => h1.merge(h2, handleOverriddenHintInfo))
4550
}
4651

47-
private def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
52+
/**
53+
* Extract all hints from the plan, returning a list of extracted hints and the transformed plan
54+
* with [[ResolvedHint]] nodes removed. The returned hint list comes in top-down order.
55+
* Note that hints can only be extracted from under certain nodes. Those that cannot be extracted
56+
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
57+
* configurations.
58+
*/
59+
private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
4860
plan match {
49-
case h: ResolvedHint => collectHints(h.child) :+ h.hints
50-
case u: UnaryNode => collectHints(u.child)
61+
case h: ResolvedHint =>
62+
val (plan, hints) = extractHintsFromPlan(h.child)
63+
(plan, h.hints +: hints)
64+
case u: UnaryNode =>
65+
val (plan, hints) = extractHintsFromPlan(u.child)
66+
(u.withNewChildren(Seq(plan)), hints)
5167
// TODO revisit this logic:
5268
// except and intersect are semi/anti-joins which won't return more data then
5369
// their left argument, so the broadcast hint should be propagated here
54-
case i: Intersect => collectHints(i.left)
55-
case e: Except => collectHints(e.left)
56-
case _ => Seq.empty
70+
case i: Intersect =>
71+
val (plan, hints) = extractHintsFromPlan(i.left)
72+
(i.copy(left = plan), hints)
73+
case e: Except =>
74+
val (plan, hints) = extractHintsFromPlan(e.left)
75+
(e.copy(left = plan), hints)
76+
case p: LogicalPlan => (p, Seq.empty)
5777
}
5878
}
79+
80+
private def handleInvalidHintInfo(hint: HintInfo): Unit = {
81+
logWarning(s"A join hint $hint is specified but it is not part of a join relation.")
82+
}
83+
84+
private def handleOverriddenHintInfo(hint: HintInfo): Unit = {
85+
logWarning(s"Join hint $hint is overridden by another hint and will not take effect.")
86+
}
5987
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,94 @@ object JoinHint {
6666
/**
6767
* The hint attributes to be applied on a specific node.
6868
*
69-
* @param broadcast If set to true, it indicates that the broadcast hash join is the preferred join
70-
* strategy and the node with this hint is preferred to be the build side.
69+
* @param strategy The preferred join strategy.
7170
*/
72-
case class HintInfo(broadcast: Boolean = false) {
71+
case class HintInfo(strategy: Option[JoinStrategyHint] = None) {
7372

74-
override def toString: String = {
75-
val hints = scala.collection.mutable.ArrayBuffer.empty[String]
76-
if (broadcast) {
77-
hints += "broadcast"
73+
/**
74+
* Combine this [[HintInfo]] with another [[HintInfo]] and return the new [[HintInfo]].
75+
* @param other the other [[HintInfo]]
76+
* @param hintOverriddenCallback a callback to notify if any [[HintInfo]] has been overridden
77+
* in this merge.
78+
*
79+
* Currently, for join strategy hints, the new [[HintInfo]] will contain the strategy in this
80+
* [[HintInfo]] if defined, otherwise the strategy in the other [[HintInfo]]. The
81+
* `hintOverriddenCallback` will be called if this [[HintInfo]] and the other [[HintInfo]]
82+
* both have a strategy defined but the join strategies are different.
83+
*/
84+
def merge(other: HintInfo, hintOverriddenCallback: HintInfo => Unit): HintInfo = {
85+
if (this.strategy.isDefined &&
86+
other.strategy.isDefined &&
87+
this.strategy.get != other.strategy.get) {
88+
hintOverriddenCallback(other)
7889
}
79-
80-
if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")")
90+
HintInfo(strategy = this.strategy.orElse(other.strategy))
8191
}
92+
93+
override def toString: String = strategy.map(s => s"(strategy=$s)").getOrElse("none")
94+
}
95+
96+
sealed abstract class JoinStrategyHint {
97+
98+
def displayName: String
99+
def hintAliases: Set[String]
100+
101+
override def toString: String = displayName
102+
}
103+
104+
/**
105+
* The enumeration of join strategy hints.
106+
*
107+
* The hinted strategy will be used for the join with which it is associated if doable. In case
108+
* of contradicting strategy hints specified for each side of the join, hints are prioritized as
109+
* BROADCAST over SHUFFLE_MERGE over SHUFFLE_HASH over SHUFFLE_REPLICATE_NL.
110+
*/
111+
object JoinStrategyHint {
112+
113+
val strategies: Set[JoinStrategyHint] = Set(
114+
BROADCAST,
115+
SHUFFLE_MERGE,
116+
SHUFFLE_HASH,
117+
SHUFFLE_REPLICATE_NL)
118+
}
119+
120+
/**
121+
* The hint for broadcast hash join or broadcast nested loop join, depending on the availability of
122+
* equi-join keys.
123+
*/
124+
case object BROADCAST extends JoinStrategyHint {
125+
override def displayName: String = "broadcast"
126+
override def hintAliases: Set[String] = Set(
127+
"BROADCAST",
128+
"BROADCASTJOIN",
129+
"MAPJOIN")
130+
}
131+
132+
/**
133+
* The hint for shuffle sort merge join.
134+
*/
135+
case object SHUFFLE_MERGE extends JoinStrategyHint {
136+
override def displayName: String = "merge"
137+
override def hintAliases: Set[String] = Set(
138+
"SHUFFLE_MERGE",
139+
"MERGE",
140+
"MERGEJOIN")
141+
}
142+
143+
/**
144+
* The hint for shuffle hash join.
145+
*/
146+
case object SHUFFLE_HASH extends JoinStrategyHint {
147+
override def displayName: String = "shuffle_hash"
148+
override def hintAliases: Set[String] = Set(
149+
"SHUFFLE_HASH")
150+
}
151+
152+
/**
153+
* The hint for shuffle-and-replicate nested loop join, a.k.a. cartesian product join.
154+
*/
155+
case object SHUFFLE_REPLICATE_NL extends JoinStrategyHint {
156+
override def displayName: String = "shuffle_replicate_nl"
157+
override def hintAliases: Set[String] = Set(
158+
"SHUFFLE_REPLICATE_NL")
82159
}

0 commit comments

Comments
 (0)