Skip to content

Commit adc1a70

Browse files
author
Ubuntu
committed
address comments
1 parent 42846f6 commit adc1a70

File tree

3 files changed

+54
-66
lines changed

3 files changed

+54
-66
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,10 +1495,9 @@ class RocksDB(
14951495
// Use RocksDBMemoryManager to calculate the memory usage accounting
14961496
val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)
14971497

1498-
val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
1499-
val globalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
1498+
val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
15001499
val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
1501-
instanceUniqueId, globalPinnedBlocksMemUsage, requestedPinnedBlocksMemUsage)
1500+
instanceUniqueId, totalPinnedBlocksMemUsage)
15021501

15031502
RocksDBMetrics(
15041503
numKeysOnLoadedVersion,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
117117
totalMemoryUsage / numBoundedInstances
118118
} else {
119119
// In unbounded memory mode, each instance has its own memory
120-
121-
// Question: Should we return 0L if the instance is not registed in instanceMemoryMap?
122-
// but when it is not registed?
123120
totalMemoryUsage
124121
}
125122
}
@@ -133,26 +130,17 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
133130
*/
134131
def getInstancePinnedBlocksMemUsage(
135132
uniqueId: String,
136-
globalPinnedUsage: Long,
137-
requestedPinnedUsage: Long): Long = {
138-
if (!instanceMemoryMap.containsKey(uniqueId)) {
139-
// Instance not registered, return 0
140-
return 0L
141-
}
142-
143-
val instanceInfo = instanceMemoryMap.get(uniqueId)
133+
pinnedUsage: Long): Long = {
134+
val instanceInfo = instanceMemoryMap.
135+
getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false))
144136
if (instanceInfo.isBoundedMemory) {
145137
// In bounded memory mode, divide by the number of bounded instances
146138
// since they share the same cache
147139
val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */)
148-
if (numBoundedInstances > 0) {
149-
globalPinnedUsage / numBoundedInstances
150-
} else {
151-
0L
152-
}
140+
pinnedUsage / numBoundedInstances
153141
} else {
154142
// In unbounded memory mode, each instance has its own cache
155-
requestedPinnedUsage
143+
pinnedUsage
156144
}
157145
}
158146

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2668,6 +2668,53 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
26682668
}
26692669
}
26702670

2671+
test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
2672+
try {
2673+
// Clear any existing providers from previous tests
2674+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2675+
2676+
val boundedMemoryId1 = "test-instance-1"
2677+
val boundedMemoryId2 = "test-instance-2"
2678+
val unboundedMemoryId = "test-instance"
2679+
val cacheUsage = 1000L
2680+
val cacheUsage1 = 300L // This should be ignored for bounded memory
2681+
2682+
// Register two bounded memory instances
2683+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
2684+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
2685+
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)
2686+
2687+
// Test that both instances get the same divided value from globalPinnedUsage
2688+
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2689+
boundedMemoryId1,
2690+
cacheUsage)
2691+
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2692+
boundedMemoryId2,
2693+
cacheUsage)
2694+
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2695+
unboundedMemoryId,
2696+
cacheUsage1)
2697+
2698+
// With 2 bounded instances, each should get half of globalPinnedUsage
2699+
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
2700+
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
2701+
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")
2702+
2703+
// Test with zero instances (unregistered instance)
2704+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2705+
val nonexistInstanceRes = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
2706+
boundedMemoryId1,
2707+
cacheUsage)
2708+
assert(
2709+
nonexistInstanceRes === cacheUsage,
2710+
s"Expected $cacheUsage when no instances, got $nonexistInstanceRes"
2711+
)
2712+
} finally {
2713+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
2714+
}
2715+
}
2716+
2717+
26712718
testWithColumnFamilies("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart",
26722719
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
26732720
withTempDir { dir =>
@@ -3672,52 +3719,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
36723719

36733720
def listFiles(file: String): Seq[File] = listFiles(new File(file))
36743721

3675-
test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
3676-
try {
3677-
// Clear any existing providers from previous tests
3678-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3679-
3680-
val boundedMemoryId1 = "test-instance-1"
3681-
val boundedMemoryId2 = "test-instance-2"
3682-
val unboundedMemoryId = "test-instance"
3683-
val sharedCachePinnedUsage = 1000L
3684-
val requestedPinnedUsage = 300L // This should be ignored for bounded memory
3685-
3686-
// Register two bounded memory instances
3687-
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
3688-
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
3689-
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)
3690-
3691-
// Test that both instances get the same divided value from globalPinnedUsage
3692-
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3693-
boundedMemoryId1,
3694-
sharedCachePinnedUsage,
3695-
requestedPinnedUsage)
3696-
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3697-
boundedMemoryId2,
3698-
sharedCachePinnedUsage,
3699-
requestedPinnedUsage)
3700-
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3701-
unboundedMemoryId,
3702-
sharedCachePinnedUsage,
3703-
requestedPinnedUsage)
3704-
3705-
// With 2 bounded instances, each should get half of globalPinnedUsage
3706-
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
3707-
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
3708-
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")
3709-
3710-
// Test with zero instances (unregistered instance)
3711-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3712-
val resultZero = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3713-
boundedMemoryId1,
3714-
sharedCachePinnedUsage,
3715-
requestedPinnedUsage)
3716-
assert(resultZero === 0L, s"Expected 0L when no instances, got $resultZero")
3717-
} finally {
3718-
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3719-
}
3720-
}
37213722
}
37223723

37233724
object RocksDBSuite {

0 commit comments

Comments
 (0)