Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4c49196
SPARK-54449: Check if off heap config is enabled before getting off h…
VindhyaG Dec 8, 2025
100dc43
SPARK-54449 : fix scala styling
VindhyaG Dec 8, 2025
4282a71
Revert "SPARK-54449 : fix scala styling"
VindhyaG Dec 8, 2025
5615d1c
SPARK-54449: fix scala styling
VindhyaG Dec 8, 2025
a2f4e27
SPARK-54449: add flag to UT
VindhyaG Dec 9, 2025
0a79a8d
UTs
VindhyaG Dec 9, 2025
c701ee2
SPARK-54449: ut fix
VindhyaG Dec 11, 2025
a48fbaf
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 2, 2026
951c31a
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 3, 2026
dcd9d03
SPARK-54449: Add hover over message
VindhyaG Feb 4, 2026
c13cc9a
SPARK-54449: Driver should show 0 off heap no matter the settings
VindhyaG Feb 4, 2026
00e7b74
SPARK-54449: UT fixes
VindhyaG Feb 6, 2026
30fe113
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 6, 2026
adf2292
SPARK-54449: Add warning if off heap size is disabled but value is no…
VindhyaG Feb 24, 2026
20df98c
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 24, 2026
df91792
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 24, 2026
8fda937
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 25, 2026
998d69a
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 25, 2026
f684da4
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 26, 2026
e050399
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 26, 2026
43dccde
Merge branch 'apache:master' into SPARK-54449
VindhyaG Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val maxOffHeapMemory =
if (conf.get(MEMORY_OFFHEAP_ENABLED)) conf.get(MEMORY_OFFHEAP_SIZE) else 0
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
*/
protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
maxOffHeapExecutionMemory: Long = 0L): MemoryManager
maxOffHeapExecutionMemory: Long = 0L,
isOffHeapEnabled: Boolean = true ): MemoryManager


// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
Expand Down Expand Up @@ -318,7 +320,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
test("off-heap execution allocations cannot exceed limit") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 2L,
maxOffHeapExecutionMemory = 1000L)
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val c = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
Expand All @@ -336,10 +338,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}

test("task peak execution memory usage") {
test("task peak execution memory usage when offheap memory is enabled") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L)
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
Expand All @@ -353,6 +355,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}

test("task peak execution memory usage when offheap memory is disabled") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = false)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)

val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
assert(result1 === 0L)
assert(result2 === 400L)
assert(tMemManager.getMemoryConsumptionForThisTask === 400L)
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 0L)
}
}

private object MemoryManagerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes

override protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
maxOffHeapExecutionMemory: Long,
isOffHeapEnabled: Boolean = false ): UnifiedMemoryManager = {
val conf = new SparkConf()
.set(MEMORY_FRACTION, 1.0)
.set(TEST_MEMORY, maxOnHeapExecutionMemory)
.set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
.set(MEMORY_STORAGE_FRACTION, storageFraction)
.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
UnifiedMemoryManager(conf, numCores = 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
memoryManager: Option[UnifiedMemoryManager] = None,
isOffHeapEnabled: Boolean = false ): BlockManager = {
conf.set(TEST_MEMORY, maxMem)
conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
conf.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
val serializerManager = new SerializerManager(serializer, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
Expand All @@ -98,6 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set(MEMORY_FRACTION, 1.0)
conf.set(MEMORY_STORAGE_FRACTION, 0.999)
conf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
conf.set(MEMORY_OFFHEAP_ENABLED, false)

// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)
Expand Down Expand Up @@ -403,7 +406,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite

// As many stores as the replication factor
val stores = (1 to maxReplication).map {
i => makeBlockManager(storeSize, s"store$i")
i => makeBlockManager(storeSize, s"store$i", isOffHeapEnabled = true)
}

storageLevels.foreach { storageLevel =>
Expand Down Expand Up @@ -587,6 +590,7 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
new SparkConf(false)
.set("spark.app.id", "test")
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(
STORAGE_REPLICATION_POLICY,
classOf[BasicBlockReplicationPolicy].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
bmConf.set(TEST_MEMORY, maxMem)
bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
if(maxMem > 0) {
bmConf.set(MEMORY_OFFHEAP_ENABLED, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does MEMORY_OFFHEAP_ENABLED depend on maxMem?

Copy link
Contributor Author

@VindhyaG VindhyaG Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the UTs wtih the tests where if maxMem set specifically greather than 0 only then set the MEMORY_OFFHEAP_ENABLED to true else keep it default false. Earlier MEMORY_OFFHEAP_ENABLED was always set to default false and only MEMORY_OFFHEAP_SIZE value was used to test all the combinations because earlier MEMORY_OFFHEAP_ENABLED true needed MEMORY_OFFHEAP_SIZE > 0 and not vice versa. MEMORY_OFFHEAP_ENABLED value did not matter at all for MEMORY_OFFHEAP_SIZE tests. We cannot do that with this change where if MEMORY_OFFHEAP_ENABLED is false maxMem is essentially zero as well.

}
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
.set(UI_KILL_ENABLED, killEnabled)
.set(UI_TIMELINE_ENABLED, timelineEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
.set(MEMORY_OFFHEAP_ENABLED, true)
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
Expand Down