diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 774ed23ed55be..c4dfe39f6744c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1461,7 +1461,6 @@ class RocksDB( private def metrics: RocksDBMetrics = { import HistogramType._ val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size") - val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") val nativeOpsHistograms = Seq( "get" -> DB_GET, "put" -> DB_WRITE, @@ -1497,6 +1496,10 @@ class RocksDB( // Use RocksDBMemoryManager to calculate the memory usage accounting val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage) + val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage() + val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + instanceUniqueId, totalPinnedBlocksMemUsage) + RocksDBMetrics( numKeysOnLoadedVersion, numKeysOnWritingVersion, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 82ee1803b3172..c96e51bcba9cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -121,6 +121,29 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { } } + /** + * Get the pinned blocks memory usage for a specific instance, accounting for bounded memory + * sharing. + * @param uniqueId The instance's unique identifier + * @param totalPinnedUsage The total pinned usage from the cache + * @return The adjusted pinned blocks memory usage accounting for sharing in bounded memory mode + */ + def getInstancePinnedBlocksMemUsage( + uniqueId: String, + totalPinnedUsage: Long): Long = { + val instanceInfo = instanceMemoryMap. + getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false)) + if (instanceInfo.isBoundedMemory) { + // In bounded memory mode, divide by the number of bounded instances + // since they share the same cache + val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */) + totalPinnedUsage / numBoundedInstances + } else { + // In unbounded memory mode, each instance has its own cache + totalPinnedUsage + } + } + def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): (WriteBufferManager, Cache) = synchronized { // Register with UnifiedMemoryManager (idempotent operation) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 801e74d288b4a..94e3bb208bf46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2788,6 +2788,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") { + try { + // Clear any existing providers from previous tests + RocksDBMemoryManager.resetWriteBufferManagerAndCache + + val boundedMemoryId1 = "test-instance-1" + val boundedMemoryId2 = "test-instance-2" + val unboundedMemoryId = "test-instance" + val cacheUsage = 1000L + val cacheUsage1 = 300L // This should be ignored for bounded memory + + // Register two bounded memory instances + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true) + RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false) + + // Test that both instances get the same divided value from globalPinnedUsage + val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + cacheUsage) + val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId2, + cacheUsage) + val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + unboundedMemoryId, + cacheUsage1) + + // With 2 bounded instances, each should get half of globalPinnedUsage + assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1") + assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2") + assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3") + + // Test with zero instances (unregistered instance) + RocksDBMemoryManager.resetWriteBufferManagerAndCache + val nonexistInstanceRes = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage( + boundedMemoryId1, + cacheUsage) + assert( + nonexistInstanceRes === cacheUsage, + s"Expected $cacheUsage when no instances, got $nonexistInstanceRes" + ) + } finally { + RocksDBMemoryManager.resetWriteBufferManagerAndCache + } + } + testWithColumnFamilies("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => withTempDir { dir =>