@@ -2788,6 +2788,53 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
2788
2788
}
2789
2789
}
2790
2790
2791
+ test(" SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage" ) {
2792
+ try {
2793
+ // Clear any existing providers from previous tests
2794
+ RocksDBMemoryManager .resetWriteBufferManagerAndCache
2795
+
2796
+ val boundedMemoryId1 = " test-instance-1"
2797
+ val boundedMemoryId2 = " test-instance-2"
2798
+ val unboundedMemoryId = " test-instance"
2799
+ val cacheUsage = 1000L
2800
+ val cacheUsage1 = 300L // This should be ignored for bounded memory
2801
+
2802
+ // Register two bounded memory instances
2803
+ RocksDBMemoryManager .updateMemoryUsage(boundedMemoryId1, 0L , isBoundedMemory = true )
2804
+ RocksDBMemoryManager .updateMemoryUsage(boundedMemoryId2, 0L , isBoundedMemory = true )
2805
+ RocksDBMemoryManager .updateMemoryUsage(unboundedMemoryId, 0L , isBoundedMemory = false )
2806
+
2807
+ // Test that both instances get the same divided value from globalPinnedUsage
2808
+ val result1 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
2809
+ boundedMemoryId1,
2810
+ cacheUsage)
2811
+ val result2 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
2812
+ boundedMemoryId2,
2813
+ cacheUsage)
2814
+ val result3 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
2815
+ unboundedMemoryId,
2816
+ cacheUsage1)
2817
+
2818
+ // With 2 bounded instances, each should get half of globalPinnedUsage
2819
+ assert(result1 === 500L , s " Expected 500L for bounded instance 1, got $result1" )
2820
+ assert(result2 === 500L , s " Expected 500L for bounded instance 2, got $result2" )
2821
+ assert(result3 === 300L , s " Expected 300L for unbounded instance, got $result3" )
2822
+
2823
+ // Test with zero instances (unregistered instance)
2824
+ RocksDBMemoryManager .resetWriteBufferManagerAndCache
2825
+ val nonexistInstanceRes = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
2826
+ boundedMemoryId1,
2827
+ cacheUsage)
2828
+ assert(
2829
+ nonexistInstanceRes === cacheUsage,
2830
+ s " Expected $cacheUsage when no instances, got $nonexistInstanceRes"
2831
+ )
2832
+ } finally {
2833
+ RocksDBMemoryManager .resetWriteBufferManagerAndCache
2834
+ }
2835
+ }
2836
+
2837
+
2791
2838
testWithColumnFamilies(" SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart" ,
2792
2839
TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled =>
2793
2840
withTempDir { dir =>
@@ -3792,52 +3839,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
3792
3839
3793
3840
def listFiles (file : String ): Seq [File ] = listFiles(new File (file))
3794
3841
3795
- test(" SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage" ) {
3796
- try {
3797
- // Clear any existing providers from previous tests
3798
- RocksDBMemoryManager .resetWriteBufferManagerAndCache
3799
-
3800
- val boundedMemoryId1 = " test-instance-1"
3801
- val boundedMemoryId2 = " test-instance-2"
3802
- val unboundedMemoryId = " test-instance"
3803
- val sharedCachePinnedUsage = 1000L
3804
- val requestedPinnedUsage = 300L // This should be ignored for bounded memory
3805
-
3806
- // Register two bounded memory instances
3807
- RocksDBMemoryManager .updateMemoryUsage(boundedMemoryId1, 0L , isBoundedMemory = true )
3808
- RocksDBMemoryManager .updateMemoryUsage(boundedMemoryId2, 0L , isBoundedMemory = true )
3809
- RocksDBMemoryManager .updateMemoryUsage(unboundedMemoryId, 0L , isBoundedMemory = false )
3810
-
3811
- // Test that both instances get the same divided value from globalPinnedUsage
3812
- val result1 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
3813
- boundedMemoryId1,
3814
- sharedCachePinnedUsage,
3815
- requestedPinnedUsage)
3816
- val result2 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
3817
- boundedMemoryId2,
3818
- sharedCachePinnedUsage,
3819
- requestedPinnedUsage)
3820
- val result3 = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
3821
- unboundedMemoryId,
3822
- sharedCachePinnedUsage,
3823
- requestedPinnedUsage)
3824
-
3825
- // With 2 bounded instances, each should get half of globalPinnedUsage
3826
- assert(result1 === 500L , s " Expected 500L for bounded instance 1, got $result1" )
3827
- assert(result2 === 500L , s " Expected 500L for bounded instance 2, got $result2" )
3828
- assert(result3 === 300L , s " Expected 300L for unbounded instance, got $result3" )
3829
-
3830
- // Test with zero instances (unregistered instance)
3831
- RocksDBMemoryManager .resetWriteBufferManagerAndCache
3832
- val resultZero = RocksDBMemoryManager .getInstancePinnedBlocksMemUsage(
3833
- boundedMemoryId1,
3834
- sharedCachePinnedUsage,
3835
- requestedPinnedUsage)
3836
- assert(resultZero === 0L , s " Expected 0L when no instances, got $resultZero" )
3837
- } finally {
3838
- RocksDBMemoryManager .resetWriteBufferManagerAndCache
3839
- }
3840
- }
3841
3842
}
3842
3843
3843
3844
object RocksDBSuite {
0 commit comments