diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index d2229e2498bf5..c62b172b70ae9 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -33,7 +33,7 @@
Summary
Off Heap Storage Memory
+ title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory. Will be 0 if off heap flag is set to false">Off Heap Storage Memory
|
Disk Used |
Cores |
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 639b82b6080b3..842c4240ba638 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -58,7 +58,8 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
+ protected[this] val maxOffHeapMemory =
+ if (conf.get(MEMORY_OFFHEAP_ENABLED)) conf.get(MEMORY_OFFHEAP_SIZE) else 0
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong
@@ -233,6 +234,8 @@ private[spark] abstract class MemoryManager(
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
+ if(conf.get(MEMORY_OFFHEAP_SIZE) > 0)
+ logWarning(s"spark.memory.offHeap.size is > 0 when spark.memory.offHeap.enabled == false")
MemoryMode.ON_HEAP
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 52856427cb37a..f506018355d81 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -884,7 +884,11 @@ private[spark] class AppStatusListener(
exec.hostPort = event.blockManagerId.hostPort
event.maxOnHeapMem.foreach { _ =>
exec.totalOnHeap = event.maxOnHeapMem.get
- exec.totalOffHeap = event.maxOffHeapMem.get
+ if (event.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER) {
+ exec.totalOffHeap = 0
+ } else {
+ exec.totalOffHeap = event.maxOffHeapMem.get
+ }
// SPARK-30594: whenever(first time or re-register) a BlockManager added, all blocks
// from this BlockManager will be reported to driver later. So, we should clean up
// used memory to avoid overlapped count.
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
index fc63185b4c4ac..999f7ff5fc1a1 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -16,7 +16,7 @@
"maxMemory" : 908381388,
"maxTasks" : 0,
"memoryMetrics" : {
- "totalOffHeapStorageMemory" : 524288000,
+ "totalOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"usedOffHeapStorageMemory" : 0,
"usedOnHeapStorageMemory" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_excludeOnFailure_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_excludeOnFailure_expectation.json
index fc63185b4c4ac..999f7ff5fc1a1 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_excludeOnFailure_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_excludeOnFailure_expectation.json
@@ -16,7 +16,7 @@
"maxMemory" : 908381388,
"maxTasks" : 0,
"memoryMetrics" : {
- "totalOffHeapStorageMemory" : 524288000,
+ "totalOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"usedOffHeapStorageMemory" : 0,
"usedOnHeapStorageMemory" : 0
diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index fa7f1238bcfd9..cc7691c14aed7 100644
--- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -256,11 +256,19 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext {
.get(ResourceProfile.OFFHEAP_MEM).map(_.amount).getOrElse(-1L))
// Ensure all executors has started
- TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+ TestUtils.waitUntilExecutorsUp(sc, 2, 120000)
+ Thread.sleep(10000)
+
+ // Check executor memory is also updated for all executors except driver
+ val executorInfos = sc.statusTracker.getExecutorInfos
+ val execWithMemory = executorInfos.filter(_.totalOffHeapStorageMemory() > 0)
+
+ assert(execWithMemory.length >= 2,
+ s"Expected atleast 2 executors with memory, got ${execWithMemory.length}")
+ execWithMemory.foreach { exec =>
+ assert(exec.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory)
+ }
- // Check executor memory is also updated
- val execInfo = sc.statusTracker.getExecutorInfos.head
- assert(execInfo.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory)
} finally {
if (sc != null) {
sc.stop()
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 1f40ef944a843..63e92d0cefdae 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -143,7 +143,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
*/
protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
- maxOffHeapExecutionMemory: Long = 0L): MemoryManager
+ maxOffHeapExecutionMemory: Long = 0L,
+ isOffHeapEnabled: Boolean = true ): MemoryManager
+
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
@@ -318,7 +320,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
test("off-heap execution allocations cannot exceed limit") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 2L,
- maxOffHeapExecutionMemory = 1000L)
+ maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)
val tMemManager = new TaskMemoryManager(memoryManager, 1)
val c = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
@@ -336,10 +338,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}
- test("task peak execution memory usage") {
+ test("task peak execution memory usage when offheap memory is enabled") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
- maxOffHeapExecutionMemory = 1000L)
+ maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)
val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
@@ -353,6 +355,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}
+
+ test("task peak execution memory usage when offheap memory is disabled") {
+ val memoryManager = createMemoryManager(
+ maxOnHeapExecutionMemory = 1000L,
+ maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = false)
+
+ val tMemManager = new TaskMemoryManager(memoryManager, 1)
+ val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
+ val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)
+
+ val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
+ val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
+ assert(result1 === 0L)
+ assert(result2 === 400L)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 400L)
+ assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
+ assert(tMemManager.getPeakOffHeapExecutionMemory === 0L)
+ }
}
private object MemoryManagerSuite {
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9f0e622b1d515..7182d100bd1cd 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -42,12 +42,14 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
override protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
- maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+ maxOffHeapExecutionMemory: Long,
+ isOffHeapEnabled: Boolean = false ): UnifiedMemoryManager = {
val conf = new SparkConf()
.set(MEMORY_FRACTION, 1.0)
.set(TEST_MEMORY, maxOnHeapExecutionMemory)
.set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
.set(MEMORY_STORAGE_FRACTION, storageFraction)
+ .set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
UnifiedMemoryManager(conf, numCores = 1)
}
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 4d75f5d7a1fc7..d9d266d3fe7a0 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1807,6 +1807,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
val memoryMetrics = d.info.memoryMetrics.get
assert(memoryMetrics.usedOffHeapStorageMemory == 0)
assert(memoryMetrics.usedOnHeapStorageMemory == 0)
+ assert(memoryMetrics.totalOffHeapStorageMemory == 0)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f5fca56e5ef77..7cab696d46627 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -75,9 +75,11 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
- memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
+ memoryManager: Option[UnifiedMemoryManager] = None,
+ isOffHeapEnabled: Boolean = false ): BlockManager = {
conf.set(TEST_MEMORY, maxMem)
conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
+ conf.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
val serializerManager = new SerializerManager(serializer, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
@@ -98,6 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set(MEMORY_FRACTION, 1.0)
conf.set(MEMORY_STORAGE_FRACTION, 0.999)
conf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+ conf.set(MEMORY_OFFHEAP_ENABLED, false)
// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)
@@ -403,7 +406,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// As many stores as the replication factor
val stores = (1 to maxReplication).map {
- i => makeBlockManager(storeSize, s"store$i")
+ i => makeBlockManager(storeSize, s"store$i", isOffHeapEnabled = true)
}
storageLevels.foreach { storageLevel =>
@@ -587,6 +590,7 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
new SparkConf(false)
.set("spark.app.id", "test")
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
+ .set(MEMORY_OFFHEAP_ENABLED, true)
.set(
STORAGE_REPLICATION_POLICY,
classOf[BasicBlockReplicationPolicy].getName)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f4bb5b7cf7cb8..7da2dc8dfdb83 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -129,6 +129,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
bmConf.set(TEST_MEMORY, maxMem)
bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
+ if(maxMem > 0) {
+ bmConf.set(MEMORY_OFFHEAP_ENABLED, true)
+ }
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 5d02dd753845e..9bbc53020761c 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -122,6 +122,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
.set(UI_KILL_ENABLED, killEnabled)
.set(UI_TIMELINE_ENABLED, timelineEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
+ .set(MEMORY_OFFHEAP_ENABLED, true)
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
@@ -151,7 +152,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
test("effects of unpersist() / persist() should be reflected") {
// Regression test for SPARK-2527
- withSpark(newSparkContext()) { sc =>
+ withSpark(newSparkContext(master = "local-cluster[1,1,1024]")) { sc =>
val ui = sc.ui.get
val rdd = sc.parallelize(Seq(1, 2, 3))
rdd.persist(StorageLevels.DISK_ONLY).count()