Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit d3c79b7

Browse files
10110346cloud-fan
authored andcommitted
[SPARK-21090][CORE] Optimize the unified memory manager code
## What changes were proposed in this pull request? 1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode. Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast. 2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable. Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution ## How was this patch tested? added unit test case Author: liuxian <[email protected]> Closes apache#18296 from 10110346/wip-lx-0614. (cherry picked from commit 112bd9b) Signed-off-by: Wenchen Fan <[email protected]>
1 parent c0d4acc commit d3c79b7

File tree

3 files changed

+36
-3
lines changed

3 files changed

+36
-3
lines changed

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
160160
case MemoryMode.OFF_HEAP => (
161161
offHeapExecutionMemoryPool,
162162
offHeapStorageMemoryPool,
163-
maxOffHeapMemory)
163+
maxOffHeapStorageMemory)
164164
}
165165
if (numBytes > maxMemory) {
166166
// Fail fast if the block simply won't fit
@@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] (
171171
if (numBytes > storagePool.memoryFree) {
172172
// There is not enough free memory in the storage pool, so try to borrow free memory from
173173
// the execution pool.
174-
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
174+
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
175+
numBytes - storagePool.memoryFree)
175176
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
176177
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
177178
}

core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
117117
evictBlocksToFreeSpaceCalled.set(numBytesToFree)
118118
if (numBytesToFree <= mm.storageMemoryUsed) {
119119
// We can evict enough blocks to fulfill the request for space
120-
mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
120+
mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
121121
evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
122122
numBytesToFree
123123
} else {

core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
303303
mm.invokePrivate[Unit](assertInvariants())
304304
}
305305

306+
test("not enough free memory in the storage pool --OFF_HEAP") {
307+
val conf = new SparkConf()
308+
.set("spark.memory.offHeap.size", "1000")
309+
.set("spark.testing.memory", "1000")
310+
.set("spark.memory.offHeap.enabled", "true")
311+
val taskAttemptId = 0L
312+
val mm = UnifiedMemoryManager(conf, numCores = 1)
313+
val ms = makeMemoryStore(mm)
314+
val memoryMode = MemoryMode.OFF_HEAP
315+
316+
assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L)
317+
assert(mm.storageMemoryUsed === 0L)
318+
assert(mm.executionMemoryUsed === 400L)
319+
320+
// Fail fast
321+
assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
322+
assert(mm.storageMemoryUsed === 0L)
323+
324+
assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
325+
assert(mm.storageMemoryUsed === 100L)
326+
assertEvictBlocksToFreeSpaceNotCalled(ms)
327+
328+
// Borrow 50 from execution memory
329+
assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode))
330+
assertEvictBlocksToFreeSpaceNotCalled(ms)
331+
assert(mm.storageMemoryUsed === 550L)
332+
333+
// Borrow 50 from execution memory and evict 50 to free space
334+
assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
335+
assertEvictBlocksToFreeSpaceCalled(ms, 50)
336+
assert(mm.storageMemoryUsed === 600L)
337+
}
306338
}

0 commit comments

Comments
 (0)