Skip to content

Conversation

zifeif2
Copy link

@zifeif2 zifeif2 commented Oct 6, 2025

…usage is enabled

What changes were proposed in this pull request?

Changing the way we calculate pinnedBlocksMemUsage when an instance has bounded memory enabled.
Before the change, we always report back getDBProperty("rocksdb.block-cache-pinned-usage") which returns that size of pinned block requested by an instance. This is not accurate when instances share the same cached, because instances might share the same pinned block

After this change, when isMemoryBounded is enabled for an instance, we call lruCache.getPinnedUsage() to get the total memory usage of SHARED pinned blocks, and divide the global usage with the number of IS_MEMORY_BOUNDED instances. This is because RocksDBMemoryManager will return the same cache for all the instance that has isMemoryBounded = true

Unit test:
build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.sql.execution.streaming.state.RocksDBSuite test

Why are the changes needed?

See above for why this is a bug.
This fix prevents us from over-reporting pinnedBlocksMemUsage when isBoundedMemory is enabled for an instance

Does this PR introduce any user-facing change?

No

How was this patch tested?

See added unit tests on the correctness of the calculation.

Was this patch authored or co-authored using generative AI tooling?

Yes. Generated-by cursor and 'claude-4-sonnet'

// In unbounded memory mode, each instance has its own memory

// Question: Should we return 0L if the instance is not registed in instanceMemoryMap?
// but when it is not registed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not registered, we currently just return the totalMemory passed in. Why do you think 0L is better?

requestedPinnedUsage: Long): Long = {
if (!instanceMemoryMap.containsKey(uniqueId)) {
// Instance not registered, return 0
return 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just return the requestedPinnedUsage instead of 0L?

if (numBoundedInstances > 0) {
globalPinnedUsage / numBoundedInstances
} else {
0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this happen? Given that we already found a boundedMemory instance for this when we did instanceMemoryMap.get(uniqueId) above. So don't think this is necessary.

// Use RocksDBMemoryManager to calculate the memory usage accounting
val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)

val requestedPinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need the getDBProperty("rocksdb.block-cache-pinned-usage") here? Because in unbounded mode, the getDBProperty("rocksdb.block-cache-pinned-usage") will be equal to lruCache.getPinnedUsage()


def listFiles(file: String): Seq[File] = listFiles(new File(file))

test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this closer to the other memory manager test case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants