Skip to content

Commit 47a668c

Browse files
committed
[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue
JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never try to allocate an array that big. This commit changes some defaults & configs to gracefully fallover to something that doesn't require one large array in some cases; in other cases it simply improves an error message for cases which will still fail. Closes apache#22818 from squito/SPARK-25827. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit 8fbc183)
1 parent 947462f commit 47a668c

File tree

7 files changed

+32
-26
lines changed

7 files changed

+32
-26
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,9 @@ package object config {
387387
.internal()
388388
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
389389
.bytesConf(ByteUnit.BYTE)
390-
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
391-
" ChunkedByteBuffer should not larger than Int.MaxValue.")
390+
.checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
391+
"The chunk size during writing out the bytes of" +
392+
" ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
392393
.createWithDefault(64 * 1024 * 1024)
393394

394395
private[spark] val CHECKPOINT_COMPRESS =
@@ -459,17 +460,19 @@ package object config {
459460
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
460461
"made in creating intermediate shuffle files.")
461462
.bytesConf(ByteUnit.KiB)
462-
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
463-
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
463+
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
464+
s"The file buffer size must be greater than 0 and less than" +
465+
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
464466
.createWithDefaultString("32k")
465467

466468
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
467469
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
468470
.doc("The file system for this buffer size after each partition " +
469471
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
470472
.bytesConf(ByteUnit.KiB)
471-
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
472-
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
473+
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
474+
s"The buffer size must be greater than 0 and less than" +
475+
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
473476
.createWithDefaultString("32k")
474477

475478
private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
@@ -580,7 +583,7 @@ package object config {
580583
.internal()
581584
.doc("For testing only, controls the size of chunks when memory mapping a file")
582585
.bytesConf(ByteUnit.BYTE)
583-
.createWithDefault(Int.MaxValue)
586+
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
584587

585588
private[spark] val BARRIER_SYNC_TIMEOUT =
586589
ConfigBuilder("spark.barrier.sync.timeout")

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.internal.{config, Logging}
3333
import org.apache.spark.network.buffer.ManagedBuffer
3434
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
3535
import org.apache.spark.security.CryptoStreamUtils
36+
import org.apache.spark.unsafe.array.ByteArrayMethods
3637
import org.apache.spark.util.Utils
3738
import org.apache.spark.util.io.ChunkedByteBuffer
3839

@@ -217,7 +218,7 @@ private[spark] class EncryptedBlockData(
217218
var remaining = blockSize
218219
val chunks = new ListBuffer[ByteBuffer]()
219220
while (remaining > 0) {
220-
val chunkSize = math.min(remaining, Int.MaxValue)
221+
val chunkSize = math.min(remaining, ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
221222
val chunk = allocator(chunkSize.toInt)
222223
remaining -= chunkSize
223224
JavaUtils.readFully(source, chunk)
@@ -235,7 +236,8 @@ private[spark] class EncryptedBlockData(
235236
// This is used by the block transfer service to replicate blocks. The upload code reads
236237
// all bytes into memory to send the block to the remote executor, so it's ok to do this
237238
// as long as the block fits in a Java array.
238-
assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
239+
assert(blockSize <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
240+
"Block is too large to be wrapped in a byte buffer.")
239241
val dst = ByteBuffer.allocate(blockSize.toInt)
240242
val in = open()
241243
try {

core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
3434
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
3535
import org.apache.spark.storage._
3636
import org.apache.spark.unsafe.Platform
37+
import org.apache.spark.unsafe.array.ByteArrayMethods
3738
import org.apache.spark.util.{SizeEstimator, Utils}
3839
import org.apache.spark.util.collection.SizeTrackingVector
3940
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
@@ -333,11 +334,11 @@ private[spark] class MemoryStore(
333334

334335
// Initial per-task memory to request for unrolling blocks (bytes).
335336
val initialMemoryThreshold = unrollMemoryThreshold
336-
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
337+
val chunkSize = if (initialMemoryThreshold > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
337338
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
338339
s"is too large to be set as chunk size. Chunk size has been capped to " +
339-
s"${Utils.bytesToString(Int.MaxValue)}")
340-
Int.MaxValue
340+
s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}")
341+
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
341342
} else {
342343
initialMemoryThreshold.toInt
343344
}

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
9797
* @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size.
9898
*/
9999
def toArray: Array[Byte] = {
100-
if (size >= Integer.MAX_VALUE) {
100+
if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
101101
throw new UnsupportedOperationException(
102102
s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size")
103103
}

mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.ml.{linalg => newlinalg}
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
3232
import org.apache.spark.sql.types._
33+
import org.apache.spark.unsafe.array.ByteArrayMethods
3334

3435
/**
3536
* Trait for a local matrix.
@@ -456,7 +457,7 @@ object DenseMatrix {
456457
*/
457458
@Since("1.3.0")
458459
def zeros(numRows: Int, numCols: Int): DenseMatrix = {
459-
require(numRows.toLong * numCols <= Int.MaxValue,
460+
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
460461
s"$numRows x $numCols dense matrix is too large to allocate")
461462
new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols))
462463
}
@@ -469,7 +470,7 @@ object DenseMatrix {
469470
*/
470471
@Since("1.3.0")
471472
def ones(numRows: Int, numCols: Int): DenseMatrix = {
472-
require(numRows.toLong * numCols <= Int.MaxValue,
473+
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
473474
s"$numRows x $numCols dense matrix is too large to allocate")
474475
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0))
475476
}
@@ -499,7 +500,7 @@ object DenseMatrix {
499500
*/
500501
@Since("1.3.0")
501502
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
502-
require(numRows.toLong * numCols <= Int.MaxValue,
503+
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
503504
s"$numRows x $numCols dense matrix is too large to allocate")
504505
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextDouble()))
505506
}
@@ -513,7 +514,7 @@ object DenseMatrix {
513514
*/
514515
@Since("1.3.0")
515516
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
516-
require(numRows.toLong * numCols <= Int.MaxValue,
517+
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
517518
s"$numRows x $numCols dense matrix is too large to allocate")
518519
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextGaussian()))
519520
}
@@ -846,8 +847,8 @@ object SparseMatrix {
846847
s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density")
847848
val size = numRows.toLong * numCols
848849
val expected = size * density
849-
assert(expected < Int.MaxValue,
850-
"The expected number of nonzeros cannot be greater than Int.MaxValue.")
850+
assert(expected < ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
851+
"The expected number of nonzeros cannot be greater than Int.MaxValue - 15.")
851852
val nnz = math.ceil(expected).toInt
852853
if (density == 0.0) {
853854
new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.collection.immutable
2727
import scala.util.matching.Regex
2828

2929
import org.apache.hadoop.fs.Path
30-
import org.tukaani.xz.LZMA2Options
3130

3231
import org.apache.spark.{SparkContext, TaskContext}
3332
import org.apache.spark.internal.Logging
@@ -36,6 +35,7 @@ import org.apache.spark.network.util.ByteUnit
3635
import org.apache.spark.sql.catalyst.analysis.Resolver
3736
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
3837
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
38+
import org.apache.spark.unsafe.array.ByteArrayMethods
3939
import org.apache.spark.util.Utils
4040

4141
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1208,7 +1208,7 @@ object SQLConf {
12081208
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
12091209
"join operator")
12101210
.intConf
1211-
.createWithDefault(Int.MaxValue)
1211+
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
12121212

12131213
val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
12141214
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
@@ -1442,7 +1442,7 @@ object SQLConf {
14421442
"'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" +
14431443
" in memory, otherwise do a global sort which spills to disk if necessary.")
14441444
.intConf
1445-
.createWithDefault(Int.MaxValue)
1445+
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
14461446

14471447
object Deprecated {
14481448
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql
1919

2020
import java.io.CharArrayWriter
21-
import java.sql.{Date, Timestamp}
2221

2322
import scala.collection.JavaConverters._
2423
import scala.language.implicitConversions
@@ -46,7 +45,6 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
4645
import org.apache.spark.sql.catalyst.plans._
4746
import org.apache.spark.sql.catalyst.plans.logical._
4847
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
49-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
5048
import org.apache.spark.sql.execution._
5149
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
5250
import org.apache.spark.sql.execution.command._
@@ -57,6 +55,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter
5755
import org.apache.spark.sql.types._
5856
import org.apache.spark.sql.util.SchemaUtils
5957
import org.apache.spark.storage.StorageLevel
58+
import org.apache.spark.unsafe.array.ByteArrayMethods
6059
import org.apache.spark.unsafe.types.CalendarInterval
6160
import org.apache.spark.util.Utils
6261

@@ -287,7 +286,7 @@ class Dataset[T] private[sql](
287286
_numRows: Int,
288287
truncate: Int = 20,
289288
vertical: Boolean = false): String = {
290-
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
289+
val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
291290
// Get rows represented by Seq[Seq[String]], we may get one more line if it has more data.
292291
val tmpRows = getRows(numRows, truncate)
293292

@@ -3264,7 +3263,7 @@ class Dataset[T] private[sql](
32643263
_numRows: Int,
32653264
truncate: Int): Array[Any] = {
32663265
EvaluatePython.registerPicklers()
3267-
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
3266+
val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
32683267
val rows = getRows(numRows, truncate).map(_.toArray).toArray
32693268
val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType)))
32703269
val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(

0 commit comments

Comments
 (0)