Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 3a7494d

Browse files
Sital Kediacloud-fan
authored andcommitted
[SPARK-22827][CORE] Avoid throwing OutOfMemoryError in case of exception in spill
## What changes were proposed in this pull request? Currently, the task memory manager throws an OutofMemory error when there is an IO exception happens in spill() - https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194. Similarly there any many other places in code when if a task is not able to acquire memory due to an exception we throw an OutofMemory error which kills the entire executor and hence failing all the tasks that are running on that executor instead of just failing one single task. ## How was this patch tested? Unit tests Author: Sital Kedia <[email protected]> Closes apache#20014 from sitalkedia/skedia/upstream_SPARK-22827.
1 parent 6129ffa commit 3a7494d

File tree

8 files changed

+50
-11
lines changed

8 files changed

+50
-11
lines changed

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void spill() throws IOException {
8888
* `LongArray` is too large to fit in a single page. The caller side should take care of these
8989
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
9090
*
91-
* @throws OutOfMemoryError
91+
* @throws SparkOutOfMemoryError
9292
* @throws TooLargePageException
9393
*/
9494
public LongArray allocateArray(long size) {
@@ -154,6 +154,6 @@ private void throwOom(final MemoryBlock page, final long required) {
154154
taskMemoryManager.freePage(page, this);
155155
}
156156
taskMemoryManager.showMemoryUsage();
157-
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
157+
throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
158158
}
159159
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.memory;
18+
19+
import org.apache.spark.annotation.Private;
20+
21+
/**
22+
* This exception is thrown when a task can not acquire memory from the Memory manager.
23+
* Instead of throwing {@link OutOfMemoryError}, which kills the executor,
24+
* we should use throw this exception, which just kills the current task.
25+
*/
26+
@Private
27+
public final class SparkOutOfMemoryError extends OutOfMemoryError {
28+
29+
public SparkOutOfMemoryError(String s) {
30+
super(s);
31+
}
32+
33+
public SparkOutOfMemoryError(OutOfMemoryError e) {
34+
super(e.getMessage());
35+
}
36+
}

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
192192
throw new RuntimeException(e.getMessage());
193193
} catch (IOException e) {
194194
logger.error("error while calling spill() on " + c, e);
195-
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
195+
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
196196
+ e.getMessage());
197197
}
198198
}
@@ -213,7 +213,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
213213
throw new RuntimeException(e.getMessage());
214214
} catch (IOException e) {
215215
logger.error("error while calling spill() on " + consumer, e);
216-
throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "
216+
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
217217
+ e.getMessage());
218218
}
219219
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.spark.executor.ShuffleWriteMetrics;
3434
import org.apache.spark.internal.config.package$;
3535
import org.apache.spark.memory.MemoryConsumer;
36+
import org.apache.spark.memory.SparkOutOfMemoryError;
3637
import org.apache.spark.memory.TaskMemoryManager;
3738
import org.apache.spark.memory.TooLargePageException;
3839
import org.apache.spark.serializer.DummySerializerInstance;
@@ -337,7 +338,7 @@ private void growPointerArrayIfNecessary() throws IOException {
337338
// The pointer array is too big to fix in a single page, spill.
338339
spill();
339340
return;
340-
} catch (OutOfMemoryError e) {
341+
} catch (SparkOutOfMemoryError e) {
341342
// should have trigger spilling
342343
if (!inMemSorter.hasSpaceForAnotherRecord()) {
343344
logger.error("Unable to grow the pointer array");

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.Supplier;
2626

2727
import com.google.common.annotations.VisibleForTesting;
28+
import org.apache.spark.memory.SparkOutOfMemoryError;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

@@ -349,7 +350,7 @@ private void growPointerArrayIfNecessary() throws IOException {
349350
// The pointer array is too big to fix in a single page, spill.
350351
spill();
351352
return;
352-
} catch (OutOfMemoryError e) {
353+
} catch (SparkOutOfMemoryError e) {
353354
// should have trigger spilling
354355
if (!inMemSorter.hasSpaceForAnotherRecord()) {
355356
logger.error("Unable to grow the pointer array");

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.spark.TaskContext;
2626
import org.apache.spark.memory.MemoryConsumer;
27+
import org.apache.spark.memory.SparkOutOfMemoryError;
2728
import org.apache.spark.memory.TaskMemoryManager;
2829
import org.apache.spark.unsafe.Platform;
2930
import org.apache.spark.unsafe.UnsafeAlignedOffset;
@@ -212,7 +213,7 @@ public boolean hasSpaceForAnotherRecord() {
212213

213214
public void expandPointerArray(LongArray newArray) {
214215
if (newArray.size() < array.size()) {
215-
throw new OutOfMemoryError("Not enough memory to grow pointer array");
216+
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
216217
}
217218
Platform.copyMemory(
218219
array.getBaseObject(),

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
3535
import org.apache.spark._
3636
import org.apache.spark.deploy.SparkHadoopUtil
3737
import org.apache.spark.internal.Logging
38-
import org.apache.spark.memory.TaskMemoryManager
38+
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
3939
import org.apache.spark.rpc.RpcTimeout
4040
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
4141
import org.apache.spark.shuffle.FetchFailedException
@@ -553,10 +553,9 @@ private[spark] class Executor(
553553

554554
// Don't forcibly exit unless the exception was inherently fatal, to avoid
555555
// stopping other tasks unnecessarily.
556-
if (Utils.isFatalError(t)) {
556+
if (!t.isInstanceOf[SparkOutOfMemoryError] && Utils.isFatalError(t)) {
557557
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
558558
}
559-
560559
} finally {
561560
runningTasks.remove(taskId)
562561
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.aggregate
1919

2020
import org.apache.spark.TaskContext
2121
import org.apache.spark.internal.Logging
22+
import org.apache.spark.memory.SparkOutOfMemoryError
2223
import org.apache.spark.sql.catalyst.InternalRow
2324
import org.apache.spark.sql.catalyst.expressions._
2425
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -205,7 +206,7 @@ class TungstenAggregationIterator(
205206
buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
206207
if (buffer == null) {
207208
// failed to allocate the first page
208-
throw new OutOfMemoryError("No enough memory for aggregation")
209+
throw new SparkOutOfMemoryError("No enough memory for aggregation")
209210
}
210211
}
211212
processRow(buffer, newInput)

0 commit comments

Comments
 (0)