Skip to content

Commit 36ea55e

Browse files
jzhugegatorsmile
authored andcommitted
[SPARK-24940][SQL] Coalesce and Repartition Hint for SQL Queries
## What changes were proposed in this pull request? Many Spark SQL users in my company have asked for a way to control the number of output files in Spark SQL. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code. We propose adding the following Hive-style Coalesce and Repartition Hint to Spark SQL: ``` ... SELECT /*+ COALESCE(numPartitions) */ ... ... SELECT /*+ REPARTITION(numPartitions) */ ... ``` Multiple such hints are allowed. Multiple nodes are inserted into the logical plan, and the optimizer will pick the leftmost hint. ``` INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t == Logical Plan == 'InsertIntoTable 'UnresolvedRelation `s`, false, false +- 'UnresolvedHint REPARTITION, [100] +- 'UnresolvedHint COALESCE, [500] +- 'UnresolvedHint COALESCE, [10] +- 'Project [*] +- 'UnresolvedRelation `t` == Optimized Logical Plan == InsertIntoHadoopFsRelationCommand ... +- Repartition 100, true +- HiveTableRelation ... ``` ## How was this patch tested? All unit tests. Manual tests using explain. Author: John Zhuge <[email protected]> Closes apache#21911 from jzhuge/SPARK-24940.
1 parent 41c2227 commit 36ea55e

File tree

6 files changed

+134
-0
lines changed

6 files changed

+134
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ class Analyzer(
145145
lazy val batches: Seq[Batch] = Seq(
146146
Batch("Hints", fixedPoint,
147147
new ResolveHints.ResolveBroadcastHints(conf),
148+
ResolveHints.ResolveCoalesceHints,
148149
ResolveHints.RemoveAllHints),
149150
Batch("Simple Sanity Check", Once,
150151
LookupFunctions),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ package org.apache.spark.sql.catalyst.analysis
2020
import java.util.Locale
2121

2222
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.expressions.Literal
2324
import org.apache.spark.sql.catalyst.plans.logical._
2425
import org.apache.spark.sql.catalyst.rules.Rule
2526
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
2627
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.types.IntegerType
2729

2830

2931
/**
@@ -102,6 +104,32 @@ object ResolveHints {
102104
}
103105
}
104106

107+
/**
108+
* COALESCE Hint accepts name "COALESCE" and "REPARTITION".
109+
* Its parameter includes a partition number.
110+
*/
111+
object ResolveCoalesceHints extends Rule[LogicalPlan] {
112+
private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION")
113+
114+
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
115+
case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
116+
val hintName = h.name.toUpperCase(Locale.ROOT)
117+
val shuffle = hintName match {
118+
case "REPARTITION" => true
119+
case "COALESCE" => false
120+
}
121+
val numPartitions = h.parameters match {
122+
case Seq(Literal(numPartitions: Int, IntegerType)) =>
123+
numPartitions
124+
case Seq(numPartitions: Int) =>
125+
numPartitions
126+
case _ =>
127+
throw new AnalysisException(s"$hintName Hint expects a partition number as parameter")
128+
}
129+
Repartition(numPartitions, shuffle, h.child)
130+
}
131+
}
132+
105133
/**
106134
* Removes all the hints, used to remove invalid hints provided by the user.
107135
* This must be executed after all the other hint rules are executed.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.dsl.expressions._
2121
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.expressions.Literal
2223
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2324
import org.apache.spark.sql.catalyst.plans.Inner
2425
import org.apache.spark.sql.catalyst.plans.logical._
@@ -120,4 +121,38 @@ class ResolveHintsSuite extends AnalysisTest {
120121
testRelation.where('a > 1).select('a).select('a).analyze,
121122
caseSensitive = false)
122123
}
124+
125+
test("coalesce and repartition hint") {
126+
checkAnalysis(
127+
UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")),
128+
Repartition(numPartitions = 10, shuffle = false, child = testRelation))
129+
checkAnalysis(
130+
UnresolvedHint("coalesce", Seq(Literal(20)), table("TaBlE")),
131+
Repartition(numPartitions = 20, shuffle = false, child = testRelation))
132+
checkAnalysis(
133+
UnresolvedHint("REPARTITION", Seq(Literal(100)), table("TaBlE")),
134+
Repartition(numPartitions = 100, shuffle = true, child = testRelation))
135+
checkAnalysis(
136+
UnresolvedHint("RePARTITion", Seq(Literal(200)), table("TaBlE")),
137+
Repartition(numPartitions = 200, shuffle = true, child = testRelation))
138+
139+
val errMsgCoal = "COALESCE Hint expects a partition number as parameter"
140+
assertAnalysisError(
141+
UnresolvedHint("COALESCE", Seq.empty, table("TaBlE")),
142+
Seq(errMsgCoal))
143+
assertAnalysisError(
144+
UnresolvedHint("COALESCE", Seq(Literal(10), Literal(false)), table("TaBlE")),
145+
Seq(errMsgCoal))
146+
assertAnalysisError(
147+
UnresolvedHint("COALESCE", Seq(Literal(1.0)), table("TaBlE")),
148+
Seq(errMsgCoal))
149+
150+
val errMsgRepa = "REPARTITION Hint expects a partition number as parameter"
151+
assertAnalysisError(
152+
UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")),
153+
Seq(errMsgRepa))
154+
assertAnalysisError(
155+
UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")),
156+
Seq(errMsgRepa))
157+
}
123158
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,33 @@ class PlanParserSuite extends AnalysisTest {
593593
parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
594594
UnresolvedHint("MAPJOIN", Seq($"t"),
595595
table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
596+
597+
comparePlans(
598+
parsePlan("SELECT /*+ COALESCE(10) */ * FROM t"),
599+
UnresolvedHint("COALESCE", Seq(Literal(10)),
600+
table("t").select(star())))
601+
602+
comparePlans(
603+
parsePlan("SELECT /*+ REPARTITION(100) */ * FROM t"),
604+
UnresolvedHint("REPARTITION", Seq(Literal(100)),
605+
table("t").select(star())))
606+
607+
comparePlans(
608+
parsePlan(
609+
"INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t"),
610+
InsertIntoTable(table("s"), Map.empty,
611+
UnresolvedHint("REPARTITION", Seq(Literal(100)),
612+
UnresolvedHint("COALESCE", Seq(Literal(500)),
613+
UnresolvedHint("COALESCE", Seq(Literal(10)),
614+
table("t").select(star())))), overwrite = false, ifPartitionNotExists = false))
615+
616+
comparePlans(
617+
parsePlan("SELECT /*+ BROADCASTJOIN(u), REPARTITION(100) */ * FROM t"),
618+
UnresolvedHint("BROADCASTJOIN", Seq($"u"),
619+
UnresolvedHint("REPARTITION", Seq(Literal(100)),
620+
table("t").select(star()))))
621+
622+
intercept("SELECT /*+ COALESCE(30 + 50) */ * FROM t", "mismatched input")
596623
}
597624

598625
test("SPARK-20854: select hint syntax with expressions") {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,14 @@ class DataFrameHintSuite extends AnalysisTest with SharedSQLContext {
5959
)
6060
)
6161
}
62+
63+
test("coalesce and repartition hint") {
64+
check(
65+
df.hint("COALESCE", 10),
66+
UnresolvedHint("COALESCE", Seq(10), df.logicalPlan))
67+
68+
check(
69+
df.hint("REPARTITION", 100),
70+
UnresolvedHint("REPARTITION", Seq(100), df.logicalPlan))
71+
}
6272
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2727
import org.apache.spark.sql.catalyst.util.StringUtils
2828
import org.apache.spark.sql.execution.aggregate
2929
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
30+
import org.apache.spark.sql.execution.datasources.FilePartition
3031
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
3132
import org.apache.spark.sql.functions._
3233
import org.apache.spark.sql.internal.SQLConf
@@ -2797,4 +2798,36 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
27972798
checkAnswer(df, Seq(Row(3, 99, 1)))
27982799
}
27992800
}
2801+
2802+
2803+
test("SPARK-24940: coalesce and repartition hint") {
2804+
withTempView("nums1") {
2805+
val numPartitionsSrc = 10
2806+
spark.range(0, 100, 1, numPartitionsSrc).createOrReplaceTempView("nums1")
2807+
assert(spark.table("nums1").rdd.getNumPartitions == numPartitionsSrc)
2808+
2809+
withTable("nums") {
2810+
sql("CREATE TABLE nums (id INT) USING parquet")
2811+
2812+
Seq(5, 20, 2).foreach { numPartitions =>
2813+
sql(
2814+
s"""
2815+
|INSERT OVERWRITE TABLE nums
2816+
|SELECT /*+ REPARTITION($numPartitions) */ *
2817+
|FROM nums1
2818+
""".stripMargin)
2819+
assert(spark.table("nums").inputFiles.length == numPartitions)
2820+
2821+
sql(
2822+
s"""
2823+
|INSERT OVERWRITE TABLE nums
2824+
|SELECT /*+ COALESCE($numPartitions) */ *
2825+
|FROM nums1
2826+
""".stripMargin)
2827+
// Coalesce can not increase the number of partitions
2828+
assert(spark.table("nums").inputFiles.length == Seq(numPartitions, numPartitionsSrc).min)
2829+
}
2830+
}
2831+
}
2832+
}
28002833
}

0 commit comments

Comments
 (0)