Skip to content

Commit de46df5

Browse files
ferdonlinegatorsmile
authored andcommitted
[SPARK-23997][SQL] Configurable maximum number of buckets
## What changes were proposed in this pull request? This PR implements the possibility of the user to override the maximum number of buckets when saving to a table. Currently the limit is a hard-coded 100k, which might be insufficient for large workloads. A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which defaults to the previous 100k. ## How was this patch tested? Added unit tests in the following spark.sql test suites: - CreateTableAsSelectSuite - BucketedWriteSuite Author: Fernando Pereira <[email protected]> Closes apache#21087 from ferdonline/enh/configurable_bucket_limit.
1 parent 1149c4e commit de46df5

File tree

4 files changed

+76
-8
lines changed

4 files changed

+76
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3232
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
3333
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3434
import org.apache.spark.sql.catalyst.util.quoteIdentifier
35+
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.types._
3637

3738

@@ -173,9 +174,12 @@ case class BucketSpec(
173174
numBuckets: Int,
174175
bucketColumnNames: Seq[String],
175176
sortColumnNames: Seq[String]) {
176-
if (numBuckets <= 0 || numBuckets >= 100000) {
177+
def conf: SQLConf = SQLConf.get
178+
179+
if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
177180
throw new AnalysisException(
178-
s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`")
181+
s"Number of buckets should be greater than 0 but less than bucketing.maxBuckets " +
182+
s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`")
179183
}
180184

181185
override def toString: String = {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,12 @@ object SQLConf {
674674
.booleanConf
675675
.createWithDefault(true)
676676

677+
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
678+
.doc("The maximum number of buckets allowed. Defaults to 100000")
679+
.intConf
680+
.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be larger than 0")
681+
.createWithDefault(100000)
682+
677683
val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
678684
.doc("When false, we will throw an error if a query contains a cartesian product without " +
679685
"explicit CROSS JOIN syntax.")
@@ -1803,6 +1809,8 @@ class SQLConf extends Serializable with Logging {
18031809

18041810
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
18051811

1812+
def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)
1813+
18061814
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
18071815
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
18081816

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package org.apache.spark.sql.sources
1919

2020
import java.io.File
21-
import java.net.URI
2221

2322
import org.apache.spark.sql.{AnalysisException, QueryTest}
23+
import org.apache.spark.sql.catalyst.TableIdentifier
24+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2425
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
2526
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2627
import org.apache.spark.sql.execution.datasources.BucketingUtils
@@ -48,16 +49,40 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
4849
intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
4950
}
5051

51-
test("numBuckets be greater than 0 but less than 100000") {
52+
test("numBuckets be greater than 0 but less/eq than default bucketing.maxBuckets (100000)") {
5253
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
5354

54-
Seq(-1, 0, 100000).foreach(numBuckets => {
55+
Seq(-1, 0, 100001).foreach(numBuckets => {
5556
val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt"))
5657
assert(
57-
e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000"))
58+
e.getMessage.contains("Number of buckets should be greater than 0 but less than"))
5859
})
5960
}
6061

62+
test("numBuckets be greater than 0 but less/eq than overridden bucketing.maxBuckets (200000)") {
63+
val maxNrBuckets: Int = 200000
64+
val catalog = spark.sessionState.catalog
65+
66+
withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) {
67+
// within the new limit
68+
Seq(100001, maxNrBuckets).foreach(numBuckets => {
69+
withTable("t") {
70+
df.write.bucketBy(numBuckets, "i").saveAsTable("t")
71+
val table = catalog.getTableMetadata(TableIdentifier("t"))
72+
assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("i"), Seq())))
73+
}
74+
})
75+
76+
// over the new limit
77+
withTable("t") {
78+
val e = intercept[AnalysisException](
79+
df.write.bucketBy(maxNrBuckets + 1, "i").saveAsTable("t"))
80+
assert(
81+
e.getMessage.contains("Number of buckets should be greater than 0 but less than"))
82+
}
83+
}
84+
}
85+
6186
test("specify sorting columns without bucketing columns") {
6287
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
6388
val e = intercept[AnalysisException] {

sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class CreateTableAsSelectSuite
225225

226226
test("create table using as select - with invalid number of buckets") {
227227
withTable("t") {
228-
Seq(0, 100000).foreach(numBuckets => {
228+
Seq(0, 100001).foreach(numBuckets => {
229229
val e = intercept[AnalysisException] {
230230
sql(
231231
s"""
@@ -236,11 +236,42 @@ class CreateTableAsSelectSuite
236236
""".stripMargin
237237
)
238238
}.getMessage
239-
assert(e.contains("Number of buckets should be greater than 0 but less than 100000"))
239+
assert(e.contains("Number of buckets should be greater than 0 but less than"))
240240
})
241241
}
242242
}
243243

244+
test("create table using as select - with overriden max number of buckets") {
245+
def createTableSql(numBuckets: Int): String =
246+
s"""
247+
|CREATE TABLE t USING PARQUET
248+
|OPTIONS (PATH '${path.toURI}')
249+
|CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS
250+
|AS SELECT 1 AS a, 2 AS b
251+
""".stripMargin
252+
253+
val maxNrBuckets: Int = 200000
254+
val catalog = spark.sessionState.catalog
255+
withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) {
256+
257+
// Within the new limit
258+
Seq(100001, maxNrBuckets).foreach(numBuckets => {
259+
withTable("t") {
260+
sql(createTableSql(numBuckets))
261+
val table = catalog.getTableMetadata(TableIdentifier("t"))
262+
assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), Seq("b"))))
263+
}
264+
})
265+
266+
// Over the new limit
267+
withTable("t") {
268+
val e = intercept[AnalysisException](sql(createTableSql(maxNrBuckets + 1)))
269+
assert(
270+
e.getMessage.contains("Number of buckets should be greater than 0 but less than "))
271+
}
272+
}
273+
}
274+
244275
test("SPARK-17409: CTAS of decimal calculation") {
245276
withTable("tab2") {
246277
withTempView("tab1") {

0 commit comments

Comments
 (0)