Skip to content

Commit 1ac1e8a

Browse files
author
Ubuntu
committed
address comments
1 parent a0ea4a4 commit 1ac1e8a

File tree

3 files changed

+53
-67
lines changed

3 files changed

+53
-67
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,10 +1496,9 @@ class RocksDB(
14961496
// Use RocksDBMemoryManager to calculate the memory usage accounting
14971497
val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)
14981498

1499-
val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
1500-
val globalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
1499+
val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
15011500
val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
1502-
instanceUniqueId, globalPinnedBlocksMemUsage, requestedPinnedBlocksMemUsage)
1501+
instanceUniqueId, totalPinnedBlocksMemUsage)
15031502

15041503
RocksDBMetrics(
15051504
numKeysOnLoadedVersion,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
117117
totalMemoryUsage / numBoundedInstances
118118
} else {
119119
// In unbounded memory mode, each instance has its own memory
120-
121-
// Question: Should we return 0L if the instance is not registed in instanceMemoryMap?
122-
// but when it is not registed?
123120
totalMemoryUsage
124121
}
125122
}
@@ -133,26 +130,17 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
133130
*/
134131
def getInstancePinnedBlocksMemUsage(
135132
uniqueId: String,
136-
globalPinnedUsage: Long,
137-
requestedPinnedUsage: Long): Long = {
138-
if (!instanceMemoryMap.containsKey(uniqueId)) {
139-
// Instance not registered, return 0
140-
return 0L
141-
}
142-
143-
val instanceInfo = instanceMemoryMap.get(uniqueId)
133+
pinnedUsage: Long): Long = {
134+
val instanceInfo = instanceMemoryMap.
135+
getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false))
144136
if (instanceInfo.isBoundedMemory) {
145137
// In bounded memory mode, divide by the number of bounded instances
146138
// since they share the same cache
147139
val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */)
148-
if (numBoundedInstances > 0) {
149-
globalPinnedUsage / numBoundedInstances
150-
} else {
151-
0L
152-
}
140+
pinnedUsage / numBoundedInstances
153141
} else {
154142
// In unbounded memory mode, each instance has its own cache
155-
requestedPinnedUsage
143+
pinnedUsage
156144
}
157145
}
158146

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 46 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2788,6 +2788,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
27882788
}
27892789
}
27902790

2791+
test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
2792+
try {
2793+
// Clear any existing providers from previous tests
2794+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2795+
2796+
val boundedMemoryId1 = "test-instance-1"
2797+
val boundedMemoryId2 = "test-instance-2"
2798+
val unboundedMemoryId = "test-instance"
2799+
val cacheUsage = 1000L
2800+
val cacheUsage1 = 300L // This should be ignored for bounded memory
2801+
2802+
// Register two bounded memory instances
2803+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
2804+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
2805+
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)
2806+
2807+
// Test that both instances get the same divided value from globalPinnedUsage
2808+
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2809+
boundedMemoryId1,
2810+
cacheUsage)
2811+
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2812+
boundedMemoryId2,
2813+
cacheUsage)
2814+
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2815+
unboundedMemoryId,
2816+
cacheUsage1)
2817+
2818+
// With 2 bounded instances, each should get half of globalPinnedUsage
2819+
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
2820+
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
2821+
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")
2822+
2823+
// Test with zero instances (unregistered instance)
2824+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2825+
val nonexistInstanceRes = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2826+
boundedMemoryId1,
2827+
cacheUsage)
2828+
assert(
2829+
nonexistInstanceRes === cacheUsage,
2830+
s"Expected $cacheUsage when no instances, got $nonexistInstanceRes"
2831+
)
2832+
} finally {
2833+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2834+
}
2835+
}
2836+
27912837
testWithColumnFamilies("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart",
27922838
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
27932839
withTempDir { dir =>
@@ -3791,53 +3837,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
37913837
}
37923838

37933839
def listFiles(file: String): Seq[File] = listFiles(new File(file))
3794-
3795-
test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
3796-
try {
3797-
// Clear any existing providers from previous tests
3798-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3799-
3800-
val boundedMemoryId1 = "test-instance-1"
3801-
val boundedMemoryId2 = "test-instance-2"
3802-
val unboundedMemoryId = "test-instance"
3803-
val sharedCachePinnedUsage = 1000L
3804-
val requestedPinnedUsage = 300L // This should be ignored for bounded memory
3805-
3806-
// Register two bounded memory instances
3807-
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
3808-
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
3809-
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)
3810-
3811-
// Test that both instances get the same divided value from globalPinnedUsage
3812-
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3813-
boundedMemoryId1,
3814-
sharedCachePinnedUsage,
3815-
requestedPinnedUsage)
3816-
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3817-
boundedMemoryId2,
3818-
sharedCachePinnedUsage,
3819-
requestedPinnedUsage)
3820-
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3821-
unboundedMemoryId,
3822-
sharedCachePinnedUsage,
3823-
requestedPinnedUsage)
3824-
3825-
// With 2 bounded instances, each should get half of globalPinnedUsage
3826-
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
3827-
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
3828-
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")
3829-
3830-
// Test with zero instances (unregistered instance)
3831-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3832-
val resultZero = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3833-
boundedMemoryId1,
3834-
sharedCachePinnedUsage,
3835-
requestedPinnedUsage)
3836-
assert(resultZero === 0L, s"Expected 0L when no instances, got $resultZero")
3837-
} finally {
3838-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3839-
}
3840-
}
38413840
}
38423841

38433842
object RocksDBSuite {

0 commit comments

Comments
 (0)