Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,6 @@ class RocksDB(
private def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
val nativeOpsHistograms = Seq(
"get" -> DB_GET,
"put" -> DB_WRITE,
Expand Down Expand Up @@ -1497,6 +1496,10 @@ class RocksDB(
// Use RocksDBMemoryManager to calculate the memory usage accounting
val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)

val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
val pinnedBlocksMemUsage = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
instanceUniqueId, totalPinnedBlocksMemUsage)

RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,29 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
}
}

/**
* Get the pinned blocks memory usage for a specific instance, accounting for bounded memory
* sharing.
* @param uniqueId The instance's unique identifier
* @param globalPinnedUsage The total pinned usage from the cache
* @return The adjusted pinned blocks memory usage accounting for sharing in bounded memory mode
*/
def getInstancePinnedBlocksMemUsage(
uniqueId: String,
pinnedUsage: Long): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: param name pinnedUsage is different from what is in the description globalPinnedUsage. Lets just name this param as: totalPinnedUsage

val instanceInfo = instanceMemoryMap.
getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false))
if (instanceInfo.isBoundedMemory) {
// In bounded memory mode, divide by the number of bounded instances
// since they share the same cache
val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory */)
pinnedUsage / numBoundedInstances
} else {
// In unbounded memory mode, each instance has its own cache
pinnedUsage
}
}

def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): (WriteBufferManager, Cache)
= synchronized {
// Register with UnifiedMemoryManager (idempotent operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
try {
// Clear any existing providers from previous tests
RocksDBMemoryManager.resetWriteBufferManagerAndCache

val boundedMemoryId1 = "test-instance-1"
val boundedMemoryId2 = "test-instance-2"
val unboundedMemoryId = "test-instance"
val cacheUsage = 1000L
val cacheUsage1 = 300L // This should be ignored for bounded memory

// Register two bounded memory instances
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L, isBoundedMemory = true)
RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L, isBoundedMemory = true)
RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L, isBoundedMemory = false)

// Test that both instances get the same divided value from globalPinnedUsage
val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
boundedMemoryId1,
cacheUsage)
val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
boundedMemoryId2,
cacheUsage)
val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
unboundedMemoryId,
cacheUsage1)

// With 2 bounded instances, each should get half of globalPinnedUsage
assert(result1 === 500L, s"Expected 500L for bounded instance 1, got $result1")
assert(result2 === 500L, s"Expected 500L for bounded instance 2, got $result2")
assert(result3 === 300L, s"Expected 300L for unbounded instance, got $result3")

// Test with zero instances (unregistered instance)
RocksDBMemoryManager.resetWriteBufferManagerAndCache
val nonexistInstanceRes = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
boundedMemoryId1,
cacheUsage)
assert(
nonexistInstanceRes === cacheUsage,
s"Expected $cacheUsage when no instances, got $nonexistInstanceRes"
)
} finally {
RocksDBMemoryManager.resetWriteBufferManagerAndCache
}
}

testWithColumnFamilies("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
withTempDir { dir =>
Expand Down