Skip to content

Commit a0ea4a4

Browse files
author
Ubuntu
committed
[SPARK-53792] Fix rocksdbPinnedBlocksMemoryUsage when bounded memory usage is enabled
1 parent fd02372 commit a0ea4a4

File tree

3 files changed

+87
-1
lines changed

3 files changed

+87
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1461,7 +1461,6 @@ class RocksDB(
14611461
private def metrics: RocksDBMetrics = {
14621462
import HistogramType._
14631463
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
1464-
val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
14651464
val nativeOpsHistograms = Seq(
14661465
"get" -> DB_GET,
14671466
"put" -> DB_WRITE,
@@ -1497,6 +1496,11 @@ class RocksDB(
14971496
// Use RocksDBMemoryManager to calculate the memory usage accounting
14981497
val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)
14991498

1499+
val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
1500+
val globalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
1501+
val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
1502+
instanceUniqueId, globalPinnedBlocksMemUsage, requestedPinnedBlocksMemUsage)
1503+
15001504
RocksDBMetrics(
15011505
numKeysOnLoadedVersion,
15021506
numKeysOnWritingVersion,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,45 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
117117
totalMemoryUsage / numBoundedInstances
118118
} else {
119119
// In unbounded memory mode, each instance has its own memory
120+
121+
// Question: Should we return 0L if the instance is not registed in instanceMemoryMap?
122+
// but when it is not registed?
120123
totalMemoryUsage
121124
}
122125
}
123126

127+
/**
128+
* Get the pinned blocks memory usage for a specific instance, accounting for bounded memory
129+
* sharing.
130+
* @param uniqueId The instance's unique identifier
131+
* @param globalPinnedUsage The total pinned usage from the cache
132+
* @return The adjusted pinned blocks memory usage accounting for sharing in bounded memory mode
133+
*/
134+
def getInstancePinnedBlocksMemUsage(
135+
uniqueId: String,
136+
globalPinnedUsage: Long,
137+
requestedPinnedUsage: Long): Long = {
138+
if (!instanceMemoryMap.containsKey(uniqueId)) {
139+
// Instance not registered, return 0
140+
return 0L
141+
}
142+
143+
val instanceInfo = instanceMemoryMap.get(uniqueId)
144+
if (instanceInfo.isBoundedMemory) {
145+
// In bounded memory mode, divide by the number of bounded instances
146+
// since they share the same cache
147+
val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */)
148+
if (numBoundedInstances > 0) {
149+
globalPinnedUsage / numBoundedInstances
150+
} else {
151+
0L
152+
}
153+
} else {
154+
// In unbounded memory mode, each instance has its own cache
155+
requestedPinnedUsage
156+
}
157+
}
158+
124159
def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): (WriteBufferManager, Cache)
125160
= synchronized {
126161
// Register with UnifiedMemoryManager (idempotent operation)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3791,6 +3791,53 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
37913791
}
37923792

37933793
def listFiles(file: String): Seq[File] = listFiles(new File(file))
3794+
3795+
test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
3796+
try {
3797+
// Clear any existing providers from previous tests
3798+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3799+
3800+
val boundedMemoryId1 = "test-instance-1"
3801+
val boundedMemoryId2 = "test-instance-2"
3802+
val unboundedMemoryId = "test-instance"
3803+
val sharedCachePinnedUsage = 1000L
3804+
val requestedPinnedUsage = 300L // This should be ignored for bounded memory
3805+
3806+
// Register two bounded memory instances
3807+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
3808+
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
3809+
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)
3810+
3811+
// Test that both instances get the same divided value from globalPinnedUsage
3812+
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3813+
boundedMemoryId1,
3814+
sharedCachePinnedUsage,
3815+
requestedPinnedUsage)
3816+
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3817+
boundedMemoryId2,
3818+
sharedCachePinnedUsage,
3819+
requestedPinnedUsage)
3820+
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3821+
unboundedMemoryId,
3822+
sharedCachePinnedUsage,
3823+
requestedPinnedUsage)
3824+
3825+
// With 2 bounded instances, each should get half of globalPinnedUsage
3826+
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
3827+
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
3828+
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")
3829+
3830+
// Test with zero instances (unregistered instance)
3831+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3832+
val resultZero = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
3833+
boundedMemoryId1,
3834+
sharedCachePinnedUsage,
3835+
requestedPinnedUsage)
3836+
assert(resultZero === 0L, s"Expected 0L when no instances, got $resultZero")
3837+
} finally {
3838+
RocksDBMemoryManager.resetWriteBufferManagerAndCache
3839+
}
3840+
}
37943841
}
37953842

37963843
object RocksDBSuite {

0 commit comments

Comments
 (0)