Skip to content

Commit 438f8fd

Browse files
szhemcloud-fan
authored andcommitted
[SPARK-26114][CORE] ExternalSorter's readingIterator field leak
## What changes were proposed in this pull request? This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations. The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields. Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available. For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned). For the following data ```scala import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 100000) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) ``` and the following job ```scala import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count ``` ... executed like the following ```bash spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memoryOverhead=512 \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' ``` ... executors are always failing with OutOfMemoryErrors. The main issue is multiple leaks of ExternalSorter references. For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones. ![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png) P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places. ## How was this patch tested? - Existing unit tests - New unit tests - Job executions on the live environment Here is the screenshot before applying this patch ![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png) Here is the screenshot after applying this patch ![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png) And in case of reducing the number of executors even more the job is still stable ![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png) Closes apache#23083 from szhem/SPARK-26114-externalsorter-leak. Authored-by: Sergey Zhemzhitsky <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 93112e6 commit 438f8fd

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed

core/src/main/scala/org/apache/spark/util/CompletionIterator.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ private[spark]
2525
abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
2626

2727
private[this] var completed = false
28-
def next(): A = sub.next()
28+
private[this] var iter = sub
29+
def next(): A = iter.next()
2930
def hasNext: Boolean = {
30-
val r = sub.hasNext
31+
val r = iter.hasNext
3132
if (!r && !completed) {
3233
completed = true
34+
// reassign to release resources of highly resource consuming iterators early
35+
iter = Iterator.empty.asInstanceOf[I]
3336
completion()
3437
}
3538
r

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C](
727727
spills.clear()
728728
forceSpillFiles.foreach(s => s.file.delete())
729729
forceSpillFiles.clear()
730-
if (map != null || buffer != null) {
730+
if (map != null || buffer != null || readingIterator != null) {
731731
map = null // So that the memory can be garbage-collected
732732
buffer = null // So that the memory can be garbage-collected
733+
readingIterator = null // So that the memory can be garbage-collected
733734
releaseMemory()
734735
}
735736
}

core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.util
1919

20+
import java.lang.ref.PhantomReference
21+
import java.lang.ref.ReferenceQueue
22+
2023
import org.apache.spark.SparkFunSuite
2124

2225
class CompletionIteratorSuite extends SparkFunSuite {
@@ -44,4 +47,23 @@ class CompletionIteratorSuite extends SparkFunSuite {
4447
assert(!completionIter.hasNext)
4548
assert(numTimesCompleted === 1)
4649
}
50+
test("reference to sub iterator should not be available after completion") {
51+
var sub = Iterator(1, 2, 3)
52+
53+
val refQueue = new ReferenceQueue[Iterator[Int]]
54+
val ref = new PhantomReference[Iterator[Int]](sub, refQueue)
55+
56+
val iter = CompletionIterator[Int, Iterator[Int]](sub, {})
57+
sub = null
58+
iter.toArray
59+
60+
for (_ <- 1 to 100 if !ref.isEnqueued) {
61+
System.gc()
62+
if (!ref.isEnqueued) {
63+
Thread.sleep(10)
64+
}
65+
}
66+
assert(ref.isEnqueued)
67+
assert(refQueue.poll() === ref)
68+
}
4769
}

0 commit comments

Comments
 (0)