From a0ea4a4efecbd0effbfe1897a814ff2d15949423 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 2 Oct 2025 05:35:27 +0000 Subject: [PATCH 1/3] [SPARK-53792] Fix rocksdbPinnedBlocksMemoryUsage when bounded memory usage is enabled --- .../execution/streaming/state/RocksDB.scala | 6 ++- .../state/RocksDBMemoryManager.scala | 35 ++++++++++++++ .../streaming/state/RocksDBSuite.scala | 47 +++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 774ed23ed55be..42bb4774c2be3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1461,7 +1461,6 @@ class RocksDB( private def metrics: RocksDBMetrics = { import HistogramType._ val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size") - val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") val nativeOpsHistograms = Seq( "get" -> DB_GET, "put" -> DB_WRITE, @@ -1497,6 +1496,11 @@ class RocksDB( // Use RocksDBMemoryManager to calculate the memory usage accounting val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage) + val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") + val globalPinnedBlocksMemUsage = lruCache.getPinnedUsage() + val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + instanceUniqueId, globalPinnedBlocksMemUsage, requestedPinnedBlocksMemUsage) + RocksDBMetrics( numKeysOnLoadedVersion, numKeysOnWritingVersion, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 82ee1803b3172..5763e4ea41b48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -117,10 +117,45 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { totalMemoryUsage / numBoundedInstances } else { // In unbounded memory mode, each instance has its own memory + + // Question: Should we return 0L if the instance is not registed in instanceMemoryMap? + // but when it is not registed? totalMemoryUsage } } + /** + * Get the pinned blocks memory usage for a specific instance, accounting for bounded memory + * sharing. + * @param uniqueId The instance's unique identifier + * @param globalPinnedUsage The total pinned usage from the cache + * @return The adjusted pinned blocks memory usage accounting for sharing in bounded memory mode + */ + def getInstancePinnedBlocksMemUsage( + uniqueId: String, + globalPinnedUsage: Long, + requestedPinnedUsage: Long): Long = { + if (!instanceMemoryMap.containsKey(uniqueId)) { + // Instance not registered, return 0 + return 0L + } + + val instanceInfo = instanceMemoryMap.get(uniqueId) + if (instanceInfo.isBoundedMemory) { + // In bounded memory mode, divide by the number of bounded instances + // since they share the same cache + val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */) + if (numBoundedInstances > 0) { + globalPinnedUsage / numBoundedInstances + } else { + 0L + } + } else { + // In unbounded memory mode, each instance has its own cache + requestedPinnedUsage + } + } + def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): (WriteBufferManager, Cache) = synchronized { // Register with UnifiedMemoryManager (idempotent operation) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 801e74d288b4a..61011f32b5a78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3791,6 +3791,53 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } def listFiles(file: String): Seq[File] = listFiles(new File(file)) + + test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") { + try { + // Clear any existing providers from previous tests + RocksDBMemoryManager.resetWriteBufferManagerAndCache + + val boundedMemoryId1 = "test-instance-1" + val boundedMemoryId2 = "test-instance-2" + val unboundedMemoryId = "test-instance" + val sharedCachePinnedUsage = 1000L + val requestedPinnedUsage = 300L // This should be ignored for bounded memory + + // Register two bounded memory instances + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false) + + // Test that both instances get the same divided value from globalPinnedUsage + val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + sharedCachePinnedUsage, + requestedPinnedUsage) + val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId2, + sharedCachePinnedUsage, + requestedPinnedUsage) + val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + unboundedMemoryId, + sharedCachePinnedUsage, + requestedPinnedUsage) + + // With 2 bounded instances, each should get half of globalPinnedUsage + assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1") + assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2") + assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3") + + // Test with zero instances (unregistered instance) + RocksDBMemoryManager.resetWriteBufferManagerAndCache + val resultZero = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + sharedCachePinnedUsage, + requestedPinnedUsage) + assert(resultZero === 0L, s"Expected 0L when no instances, got $resultZero") + } finally { + RocksDBMemoryManager.resetWriteBufferManagerAndCache + } + } } object RocksDBSuite { From 1ac1e8a220ebeb07cb91692204b342b6c331acb1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Oct 2025 07:28:48 +0000 Subject: [PATCH 2/3] address comments --- .../execution/streaming/state/RocksDB.scala | 5 +- .../state/RocksDBMemoryManager.scala | 22 +---- .../streaming/state/RocksDBSuite.scala | 93 +++++++++---------- 3 files changed, 53 insertions(+), 67 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 42bb4774c2be3..c4dfe39f6744c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1496,10 +1496,9 @@ class RocksDB( // Use RocksDBMemoryManager to calculate the memory usage accounting val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage) - val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") - val globalPinnedBlocksMemUsage = lruCache.getPinnedUsage() + val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage() val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( - instanceUniqueId, globalPinnedBlocksMemUsage, requestedPinnedBlocksMemUsage) + instanceUniqueId, totalPinnedBlocksMemUsage) RocksDBMetrics( numKeysOnLoadedVersion, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 5763e4ea41b48..bac1531bce3ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -117,9 +117,6 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { totalMemoryUsage / numBoundedInstances } else { // In unbounded memory mode, each instance has its own memory - - // Question: Should we return 0L if the instance is not registed in instanceMemoryMap? - // but when it is not registed? totalMemoryUsage } } @@ -133,26 +130,17 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { */ def getInstancePinnedBlocksMemUsage( uniqueId: String, - globalPinnedUsage: Long, - requestedPinnedUsage: Long): Long = { - if (!instanceMemoryMap.containsKey(uniqueId)) { - // Instance not registered, return 0 - return 0L - } - - val instanceInfo = instanceMemoryMap.get(uniqueId) + pinnedUsage: Long): Long = { + val instanceInfo = instanceMemoryMap. + getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false)) if (instanceInfo.isBoundedMemory) { // In bounded memory mode, divide by the number of bounded instances // since they share the same cache val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */) - if (numBoundedInstances > 0) { - globalPinnedUsage / numBoundedInstances - } else { - 0L - } + pinnedUsage / numBoundedInstances } else { // In unbounded memory mode, each instance has its own cache - requestedPinnedUsage + pinnedUsage } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 61011f32b5a78..94e3bb208bf46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2788,6 +2788,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") { + try { + // Clear any existing providers from previous tests + RocksDBMemoryManager.resetWriteBufferManagerAndCache + + val boundedMemoryId1 = "test-instance-1" + val boundedMemoryId2 = "test-instance-2" + val unboundedMemoryId = "test-instance" + val cacheUsage = 1000L + val cacheUsage1 = 300L // This should be ignored for bounded memory + + // Register two bounded memory instances + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false) + + // Test that both instances get the same divided value from globalPinnedUsage + val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + cacheUsage) + val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId2, + cacheUsage) + val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + unboundedMemoryId, + cacheUsage1) + + // With 2 bounded instances, each should get half of globalPinnedUsage + assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1") + assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2") + assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3") + + // Test with zero instances (unregistered instance) + RocksDBMemoryManager.resetWriteBufferManagerAndCache + val nonexistInstanceRes = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + cacheUsage) + assert( + nonexistInstanceRes === cacheUsage, + s"Expected $cacheUsage when no instances, got $nonexistInstanceRes" + ) + } finally { + RocksDBMemoryManager.resetWriteBufferManagerAndCache + } + } + testWithColumnFamilies("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => withTempDir { dir => @@ -3791,53 +3837,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } def listFiles(file: String): Seq[File] = listFiles(new File(file)) - - test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") { - try { - // Clear any existing providers from previous tests - RocksDBMemoryManager.resetWriteBufferManagerAndCache - - val boundedMemoryId1 = "test-instance-1" - val boundedMemoryId2 = "test-instance-2" - val unboundedMemoryId = "test-instance" - val sharedCachePinnedUsage = 1000L - val requestedPinnedUsage = 300L // This should be ignored for bounded memory - - // Register two bounded memory instances - RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true) - RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true) - RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false) - - // Test that both instances get the same divided value from globalPinnedUsage - val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( - boundedMemoryId1, - sharedCachePinnedUsage, - requestedPinnedUsage) - val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( - boundedMemoryId2, - sharedCachePinnedUsage, - requestedPinnedUsage) - val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( - unboundedMemoryId, - sharedCachePinnedUsage, - requestedPinnedUsage) - - // With 2 bounded instances, each should get half of globalPinnedUsage - assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1") - assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2") - assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3") - - // Test with zero instances (unregistered instance) - RocksDBMemoryManager.resetWriteBufferManagerAndCache - val resultZero = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( - boundedMemoryId1, - sharedCachePinnedUsage, - requestedPinnedUsage) - assert(resultZero === 0L, s"Expected 0L when no instances, got $resultZero") - } finally { - RocksDBMemoryManager.resetWriteBufferManagerAndCache - } - } } object RocksDBSuite { From 9d6e603f0ae1829485a1e83313c282df656130f1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Oct 2025 23:08:18 +0000 Subject: [PATCH 3/3] address comment --- .../execution/streaming/state/RocksDBMemoryManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index bac1531bce3ff..c96e51bcba9cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -125,22 +125,22 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { * Get the pinned blocks memory usage for a specific instance, accounting for bounded memory * sharing. * @param uniqueId The instance's unique identifier - * @param globalPinnedUsage The total pinned usage from the cache + * @param totalPinnedUsage The total pinned usage from the cache * @return The adjusted pinned blocks memory usage accounting for sharing in bounded memory mode */ def getInstancePinnedBlocksMemUsage( uniqueId: String, - pinnedUsage: Long): Long = { + totalPinnedUsage: Long): Long = { val instanceInfo = instanceMemoryMap. getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false)) if (instanceInfo.isBoundedMemory) { // In bounded memory mode, divide by the number of bounded instances // since they share the same cache val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */) - pinnedUsage / numBoundedInstances + totalPinnedUsage / numBoundedInstances } else { // In unbounded memory mode, each instance has its own cache - pinnedUsage + totalPinnedUsage } }