Skip to content

Commit e4b36df

Browse files
10110346cloud-fan
authored andcommitted
[SPARK-27256][CORE][SQL] If the configuration is used to set the number of bytes, we'd better use bytesConf'.
## What changes were proposed in this pull request? Currently, if we want to configure `spark.sql.files.maxPartitionBytes` to 256 megabytes, we must set `spark.sql.files.maxPartitionBytes=268435456`, which is very unfriendly to users. And if we set it like this:`spark.sql.files.maxPartitionBytes=256M`, we will encounter this exception: ``` Exception in thread "main" java.lang.IllegalArgumentException: spark.sql.files.maxPartitionBytes should be long, but was 256M at org.apache.spark.internal.config.ConfigHelpers$.toNumber(ConfigBuilder.scala) ``` This PR use `bytesConf` to replace `longConf` or `intConf`, if the configuration is used to set the number of bytes. Configuration change list: `spark.files.maxPartitionBytes` `spark.files.openCostInBytes` `spark.shuffle.sort.initialBufferSize` `spark.shuffle.spill.initialMemoryThreshold` `spark.sql.autoBroadcastJoinThreshold` `spark.sql.files.maxPartitionBytes` `spark.sql.files.openCostInBytes` `spark.sql.defaultSizeInBytes` ## How was this patch tested? 1.Existing unit tests 2.Manual testing Closes apache#24187 from 10110346/bytesConf. Authored-by: liuxian <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4b2b3da commit e4b36df

File tree

5 files changed

+13
-12
lines changed

5 files changed

+13
-12
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public UnsafeShuffleWriter(
144144
this.sparkConf = sparkConf;
145145
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
146146
this.initialSortBufferSize =
147-
(int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
147+
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
148148
this.inputBufferSizeInBytes =
149149
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
150150
this.outputBufferSizeInBytes =

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -606,15 +606,15 @@ package object config {
606606

607607
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
608608
.doc("The maximum number of bytes to pack into a single partition when reading files.")
609-
.longConf
609+
.bytesConf(ByteUnit.BYTE)
610610
.createWithDefault(128 * 1024 * 1024)
611611

612612
private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
613613
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
614614
" the same time. This is used when putting multiple files into a partition. It's better to" +
615615
" over estimate, then the partitions with small files will be faster than partitions with" +
616616
" bigger files.")
617-
.longConf
617+
.bytesConf(ByteUnit.BYTE)
618618
.createWithDefault(4 * 1024 * 1024)
619619

620620
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
@@ -868,8 +868,9 @@ package object config {
868868
private[spark] val SHUFFLE_SORT_INIT_BUFFER_SIZE =
869869
ConfigBuilder("spark.shuffle.sort.initialBufferSize")
870870
.internal()
871-
.intConf
872-
.checkValue(v => v > 0, "The value should be a positive integer.")
871+
.bytesConf(ByteUnit.BYTE)
872+
.checkValue(v => v > 0 && v <= Int.MaxValue,
873+
s"The buffer size must be greater than 0 and less than or equal to ${Int.MaxValue}.")
873874
.createWithDefault(4096)
874875

875876
private[spark] val SHUFFLE_COMPRESS =
@@ -891,7 +892,7 @@ package object config {
891892
.internal()
892893
.doc("Initial threshold for the size of a collection before we start tracking its " +
893894
"memory usage.")
894-
.longConf
895+
.bytesConf(ByteUnit.BYTE)
895896
.createWithDefault(5 * 1024 * 1024)
896897

897898
private[spark] val SHUFFLE_SPILL_BATCH_SIZE =

sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private UnsafeExternalRowSorter(
111111
taskContext,
112112
recordComparatorSupplier,
113113
prefixComparator,
114-
(int) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
114+
(int) (long) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
115115
pageSizeBytes,
116116
(int) SparkEnv.get().conf().get(
117117
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ object SQLConf {
254254
"command <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been " +
255255
"run, and file-based data source tables where the statistics are computed directly on " +
256256
"the files of data.")
257-
.longConf
257+
.bytesConf(ByteUnit.BYTE)
258258
.createWithDefault(10L * 1024 * 1024)
259259

260260
val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
@@ -853,7 +853,7 @@ object SQLConf {
853853

854854
val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
855855
.doc("The maximum number of bytes to pack into a single partition when reading files.")
856-
.longConf
856+
.bytesConf(ByteUnit.BYTE)
857857
.createWithDefault(128 * 1024 * 1024) // parquet.block.size
858858

859859
val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes")
@@ -1156,7 +1156,7 @@ object SQLConf {
11561156
s"which is larger than `${AUTO_BROADCASTJOIN_THRESHOLD.key}` to be more conservative. " +
11571157
"That is to say by default the optimizer will not choose to broadcast a table unless it " +
11581158
"knows for sure its size is small enough.")
1159-
.longConf
1159+
.bytesConf(ByteUnit.BYTE)
11601160
.createWithDefault(Long.MaxValue)
11611161

11621162
val NDV_MAX_ERROR =

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public UnsafeKVExternalSorter(
9494
taskContext,
9595
comparatorSupplier,
9696
prefixComparator,
97-
(int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
97+
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
9898
pageSizeBytes,
9999
numElementsForSpillThreshold,
100100
canUseRadixSort);
@@ -160,7 +160,7 @@ public UnsafeKVExternalSorter(
160160
taskContext,
161161
comparatorSupplier,
162162
prefixComparator,
163-
(int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
163+
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
164164
pageSizeBytes,
165165
numElementsForSpillThreshold,
166166
inMemSorter);

0 commit comments

Comments
 (0)