Skip to content

Commit 466d011

Browse files
heary-caocloud-fan
authored andcommitted
[SPARK-26117][CORE][SQL] use SparkOutOfMemoryError instead of OutOfMemoryError when catch exception
## What changes were proposed in this pull request? the pr apache#20014 which introduced `SparkOutOfMemoryError` to avoid killing the entire executor when an `OutOfMemoryError `is thrown. so apply for memory using `MemoryConsumer. allocatePage `when catch exception, use `SparkOutOfMemoryError `instead of `OutOfMemoryError` ## How was this patch tested? N / A Closes apache#23084 from heary-cao/SparkOutOfMemoryError. Authored-by: caoxuewen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 92fc0a8 commit 466d011

File tree

6 files changed

+19
-15
lines changed

6 files changed

+19
-15
lines changed

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public void spill() throws IOException {
8383
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
8484

8585
/**
86-
* Allocates a LongArray of `size`. Note that this method may throw `OutOfMemoryError` if Spark
87-
* doesn't have enough memory for this allocation, or throw `TooLargePageException` if this
88-
* `LongArray` is too large to fit in a single page. The caller side should take care of these
89-
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
86+
* Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError`
87+
* if Spark doesn't have enough memory for this allocation, or throw `TooLargePageException`
88+
* if this `LongArray` is too large to fit in a single page. The caller side should take care of
89+
* these two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
9090
*
9191
* @throws SparkOutOfMemoryError
9292
* @throws TooLargePageException
@@ -111,7 +111,7 @@ public void freeArray(LongArray array) {
111111
/**
112112
* Allocate a memory block with at least `required` bytes.
113113
*
114-
* @throws OutOfMemoryError
114+
* @throws SparkOutOfMemoryError
115115
*/
116116
protected MemoryBlock allocatePage(long required) {
117117
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.spark.SparkEnv;
3232
import org.apache.spark.executor.ShuffleWriteMetrics;
3333
import org.apache.spark.memory.MemoryConsumer;
34+
import org.apache.spark.memory.SparkOutOfMemoryError;
3435
import org.apache.spark.memory.TaskMemoryManager;
3536
import org.apache.spark.serializer.SerializerManager;
3637
import org.apache.spark.storage.BlockManager;
@@ -741,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
741742
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
742743
try {
743744
growAndRehash();
744-
} catch (OutOfMemoryError oom) {
745+
} catch (SparkOutOfMemoryError oom) {
745746
canGrowArray = false;
746747
}
747748
}
@@ -757,7 +758,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
757758
private boolean acquireNewPage(long required) {
758759
try {
759760
currentPage = allocatePage(required);
760-
} catch (OutOfMemoryError e) {
761+
} catch (SparkOutOfMemoryError e) {
761762
return false;
762763
}
763764
dataPages.add(currentPage);

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.spark.executor.TaskMetrics;
3939
import org.apache.spark.internal.config.package$;
4040
import org.apache.spark.memory.TestMemoryManager;
41+
import org.apache.spark.memory.SparkOutOfMemoryError;
4142
import org.apache.spark.memory.TaskMemoryManager;
4243
import org.apache.spark.serializer.JavaSerializer;
4344
import org.apache.spark.serializer.SerializerInstance;
@@ -534,10 +535,10 @@ public void testOOMDuringSpill() throws Exception {
534535
insertNumber(sorter, 1024);
535536
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
536537
}
537-
// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
538-
catch (OutOfMemoryError oom){
538+
// we expect an SparkOutOfMemoryError here, anything else (i.e the original NPE is a failure)
539+
catch (SparkOutOfMemoryError oom){
539540
String oomStackTrace = Utils.exceptionString(oom);
540-
assertThat("expected OutOfMemoryError in " +
541+
assertThat("expected SparkOutOfMemoryError in " +
541542
"org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
542543
oomStackTrace,
543544
Matchers.containsString(

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.spark.SparkConf;
2828
import org.apache.spark.memory.TestMemoryConsumer;
2929
import org.apache.spark.memory.TestMemoryManager;
30+
import org.apache.spark.memory.SparkOutOfMemoryError;
3031
import org.apache.spark.memory.TaskMemoryManager;
3132
import org.apache.spark.unsafe.Platform;
3233
import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -178,8 +179,8 @@ public int compare(
178179
testMemoryManager.markExecutionAsOutOfMemoryOnce();
179180
try {
180181
sorter.reset();
181-
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
182-
} catch (OutOfMemoryError oom) {
182+
fail("expected SparkOutOfMemoryError but it seems operation surprisingly succeeded");
183+
} catch (SparkOutOfMemoryError oom) {
183184
// as expected
184185
}
185186
// [SPARK-21907] this failed on NPE at

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121

2222
import org.apache.spark.memory.MemoryConsumer;
23+
import org.apache.spark.memory.SparkOutOfMemoryError;
2324
import org.apache.spark.memory.TaskMemoryManager;
2425
import org.apache.spark.sql.types.*;
2526
import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -126,7 +127,7 @@ public final void close() {
126127
private boolean acquirePage(long requiredSize) {
127128
try {
128129
page = allocatePage(requiredSize);
129-
} catch (OutOfMemoryError e) {
130+
} catch (SparkOutOfMemoryError e) {
130131
logger.warn("Failed to allocate page ({} bytes).", requiredSize);
131132
return false;
132133
}

sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.io.Closeables
2323

2424
import org.apache.spark.{SparkEnv, SparkException}
2525
import org.apache.spark.io.NioBufferedFileInputStream
26-
import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
26+
import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, TaskMemoryManager}
2727
import org.apache.spark.serializer.SerializerManager
2828
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2929
import org.apache.spark.unsafe.Platform
@@ -226,7 +226,7 @@ private[python] case class HybridRowQueue(
226226
val page = try {
227227
allocatePage(required)
228228
} catch {
229-
case _: OutOfMemoryError =>
229+
case _: SparkOutOfMemoryError =>
230230
null
231231
}
232232
val buffer = if (page != null) {

0 commit comments

Comments
 (0)