Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit bebd2e1

Browse files
Feng Liugatorsmile
authored andcommitted
[SPARK-22222][CORE] Fix the ARRAY_MAX in BufferHolder and add a test
## What changes were proposed in this pull request? We should not break the assumption that the length of the allocated byte array is word rounded: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170 So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array. cc: srowen gatorsmile ## How was this patch tested? Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Feng Liu <[email protected]> Closes apache#19460 from liufengdb/fix_array_max.
1 parent 71c2b81 commit bebd2e1

File tree

7 files changed

+124
-35
lines changed

7 files changed

+124
-35
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
4040
}
4141
}
4242

43+
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller.
44+
// Be conservative and lower the cap a little.
45+
// Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229"
46+
// This value is word rounded. Use this value if the allocated byte arrays are used to store other
47+
// types rather than bytes.
48+
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
49+
4350
private static final boolean unaligned = Platform.unaligned();
4451
/**
4552
* Optimized byte array equality check for byte arrays.

core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.unsafe.map;
1919

20+
import org.apache.spark.unsafe.array.ByteArrayMethods;
21+
2022
/**
2123
* Interface that defines how we can grow the size of a hash map when it is over a threshold.
2224
*/
@@ -31,9 +33,7 @@ public interface HashMapGrowthStrategy {
3133

3234
class Doubling implements HashMapGrowthStrategy {
3335

34-
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
35-
// smaller. Be conservative and lower the cap a little.
36-
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
36+
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
3737

3838
@Override
3939
public int nextCapacity(int currentCapacity) {

core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.util.collection
1919

2020
import java.util.Comparator
2121

22+
import org.apache.spark.unsafe.Platform
23+
import org.apache.spark.unsafe.array.ByteArrayMethods
2224
import org.apache.spark.util.collection.WritablePartitionedPairCollection._
2325

2426
/**
@@ -96,7 +98,5 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
9698
}
9799

98100
private object PartitionedPairBuffer {
99-
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
100-
// smaller. Be conservative and lower the cap a little.
101-
val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
101+
val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2
102102
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class SparkSubmitSuite
100100
with TimeLimits
101101
with TestPrematureExit {
102102

103+
import SparkSubmitSuite._
104+
103105
override def beforeEach() {
104106
super.beforeEach()
105107
System.setProperty("spark.testing", "true")
@@ -974,30 +976,6 @@ class SparkSubmitSuite
974976
}
975977
}
976978

977-
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
978-
private def runSparkSubmit(args: Seq[String]): Unit = {
979-
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
980-
val sparkSubmitFile = if (Utils.isWindows) {
981-
new File("..\\bin\\spark-submit.cmd")
982-
} else {
983-
new File("../bin/spark-submit")
984-
}
985-
val process = Utils.executeCommand(
986-
Seq(sparkSubmitFile.getCanonicalPath) ++ args,
987-
new File(sparkHome),
988-
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
989-
990-
try {
991-
val exitCode = failAfter(60 seconds) { process.waitFor() }
992-
if (exitCode != 0) {
993-
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
994-
}
995-
} finally {
996-
// Ensure we still kill the process in case it timed out
997-
process.destroy()
998-
}
999-
}
1000-
1001979
private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
1002980
val tmpDir = Utils.createTempDir()
1003981

@@ -1020,6 +998,32 @@ class SparkSubmitSuite
1020998
}
1021999
}
10221000

1001+
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
1002+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
1003+
def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = {
1004+
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
1005+
val sparkSubmitFile = if (Utils.isWindows) {
1006+
new File(s"$root\\bin\\spark-submit.cmd")
1007+
} else {
1008+
new File(s"$root/bin/spark-submit")
1009+
}
1010+
val process = Utils.executeCommand(
1011+
Seq(sparkSubmitFile.getCanonicalPath) ++ args,
1012+
new File(sparkHome),
1013+
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
1014+
1015+
try {
1016+
val exitCode = failAfter(60 seconds) { process.waitFor() }
1017+
if (exitCode != 0) {
1018+
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
1019+
}
1020+
} finally {
1021+
// Ensure we still kill the process in case it timed out
1022+
process.destroy()
1023+
}
1024+
}
1025+
}
1026+
10231027
object JarCreationTest extends Logging {
10241028
def main(args: Array[String]) {
10251029
Utils.configTestLog4j("INFO")

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
2121
import org.apache.spark.unsafe.Platform;
22+
import org.apache.spark.unsafe.array.ByteArrayMethods;
2223

2324
/**
2425
* A helper class to manage the data buffer for an unsafe row. The data buffer can grow and
@@ -36,9 +37,7 @@
3637
*/
3738
public class BufferHolder {
3839

39-
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
40-
// smaller. Be conservative and lower the cap a little.
41-
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
40+
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
4241

4342
public byte[] buffer;
4443
public int cursor = Platform.BYTE_ARRAY_OFFSET;
@@ -51,7 +50,7 @@ public BufferHolder(UnsafeRow row) {
5150

5251
public BufferHolder(UnsafeRow row, int initialSize) {
5352
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
54-
if (row.numFields() > (Integer.MAX_VALUE - initialSize - bitsetWidthInBytes) / 8) {
53+
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
5554
throw new UnsupportedOperationException(
5655
"Cannot create BufferHolder for input UnsafeRow because there are " +
5756
"too many fields (number of fields: " + row.numFields() + ")");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions.codegen
19+
20+
import org.scalatest.{BeforeAndAfterEach, Matchers}
21+
import org.scalatest.concurrent.Timeouts
22+
23+
import org.apache.spark.{SparkFunSuite, TestUtils}
24+
import org.apache.spark.deploy.SparkSubmitSuite
25+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
26+
import org.apache.spark.unsafe.array.ByteArrayMethods
27+
import org.apache.spark.util.ResetSystemProperties
28+
29+
// A test for growing the buffer holder to nearly 2GB. Due to the heap size limitation of the Spark
30+
// unit tests JVM, the actually test code is running as a submit job.
31+
class BufferHolderSparkSubmitSuite
32+
extends SparkFunSuite
33+
with Matchers
34+
with BeforeAndAfterEach
35+
with ResetSystemProperties
36+
with Timeouts {
37+
38+
test("SPARK-22222: Buffer holder should be able to allocate memory larger than 1GB") {
39+
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
40+
41+
val argsForSparkSubmit = Seq(
42+
"--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"),
43+
"--name", "SPARK-22222",
44+
"--master", "local-cluster[2,1,1024]",
45+
"--driver-memory", "4g",
46+
"--conf", "spark.ui.enabled=false",
47+
"--conf", "spark.master.rest.enabled=false",
48+
"--conf", "spark.driver.extraJavaOptions=-ea",
49+
unusedJar.toString)
50+
SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..")
51+
}
52+
}
53+
54+
object BufferHolderSparkSubmitSuite {
55+
56+
def main(args: Array[String]): Unit = {
57+
58+
val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
59+
60+
val holder = new BufferHolder(new UnsafeRow(1000))
61+
62+
holder.reset()
63+
holder.grow(roundToWord(ARRAY_MAX / 2))
64+
65+
holder.reset()
66+
holder.grow(roundToWord(ARRAY_MAX / 2 + 8))
67+
68+
holder.reset()
69+
holder.grow(roundToWord(Integer.MAX_VALUE / 2))
70+
71+
holder.reset()
72+
holder.grow(roundToWord(Integer.MAX_VALUE))
73+
}
74+
75+
private def roundToWord(len: Int): Int = {
76+
ByteArrayMethods.roundNumberOfBytesToNearestWord(len)
77+
}
78+
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.spark.sql.internal.SQLConf;
2525
import org.apache.spark.sql.types.*;
26+
import org.apache.spark.unsafe.array.ByteArrayMethods;
2627
import org.apache.spark.unsafe.types.UTF8String;
2728

2829
/**
@@ -595,7 +596,7 @@ public final int appendStruct(boolean isNull) {
595596
* Upper limit for the maximum capacity for this column.
596597
*/
597598
@VisibleForTesting
598-
protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
599+
protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
599600

600601
/**
601602
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.

0 commit comments

Comments
 (0)