Skip to content

Commit 87bd9c7

Browse files
Brandon Kriegersrowen
authored andcommitted
[SPARK-25998][CORE] Change TorrentBroadcast to hold weak reference of broadcast object
## What changes were proposed in this pull request? This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read. ## How was this patch tested? Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998. Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak. Authored-by: Brandon Krieger <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent ce61bac commit 87bd9c7

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.io._
21+
import java.lang.ref.SoftReference
2122
import java.nio.ByteBuffer
2223
import java.util.zip.Adler32
2324

@@ -61,9 +62,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
6162
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
6263
* which builds this value by reading blocks from the driver and/or other executors.
6364
*
64-
* On the driver, if the value is required, it is read lazily from the block manager.
65+
* On the driver, if the value is required, it is read lazily from the block manager. We hold
66+
* a soft reference so that it can be garbage collected if required, as we can always reconstruct
67+
* in the future.
6568
*/
66-
@transient private lazy val _value: T = readBroadcastBlock()
69+
@transient private var _value: SoftReference[T] = _
6770

6871
/** The compression codec to use, or None if compression is disabled */
6972
@transient private var compressionCodec: Option[CompressionCodec] = _
@@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
9295
/** The checksum for all the blocks. */
9396
private var checksums: Array[Int] = _
9497

95-
override protected def getValue() = {
96-
_value
98+
override protected def getValue() = synchronized {
99+
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
100+
if (memoized != null) {
101+
memoized
102+
} else {
103+
val newlyRead = readBroadcastBlock()
104+
_value = new SoftReference[T](newlyRead)
105+
newlyRead
106+
}
97107
}
98108

99109
private def calcChecksum(block: ByteBuffer): Int = {
@@ -205,8 +215,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
205215
}
206216

207217
private def readBroadcastBlock(): T = Utils.tryOrIOException {
208-
TorrentBroadcast.synchronized {
209-
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
218+
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
219+
broadcastCache.synchronized {
210220

211221
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
212222
setConf(SparkEnv.get.conf)

0 commit comments

Comments
 (0)