Skip to content

Commit 4ea60ce

Browse files
authored
fix: Simplify CometShuffleMemoryAllocator logic, rename classes, remove config (#1485)
1 parent 1979218 commit 4ea60ce

File tree

8 files changed

+113
-117
lines changed

8 files changed

+113
-117
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -404,17 +404,6 @@ object CometConf extends ShimCometConf {
404404
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
405405
.createWithDefault(1.0)
406406

407-
val COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST: ConfigEntry[Boolean] =
408-
conf("spark.comet.columnar.shuffle.unifiedMemoryAllocatorTest")
409-
.doc("Whether to use Spark unified memory allocator for Comet columnar shuffle in tests." +
410-
"If not configured, Comet will use a test-only memory allocator for Comet columnar " +
411-
"shuffle when Spark test env detected. The test-ony allocator is proposed to run with " +
412-
"Spark tests as these tests require on-heap memory configuration. " +
413-
"By default, this config is false.")
414-
.internal()
415-
.booleanConf
416-
.createWithDefault(false)
417-
418407
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
419408
conf("spark.comet.columnar.shuffle.batch.size")
420409
.internal()

spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java renamed to spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
* Thus, this allocator is used to allocate separate off-heap memory allocation for Comet JVM
4949
* shuffle and execution apart from Spark's on-heap memory configuration.
5050
*/
51-
public final class CometTestShuffleMemoryAllocator extends CometShuffleMemoryAllocatorTrait {
51+
public final class CometBoundedShuffleMemoryAllocator extends CometShuffleMemoryAllocatorTrait {
5252
private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
5353

5454
private final long pageSize;
@@ -67,9 +67,7 @@ public final class CometTestShuffleMemoryAllocator extends CometShuffleMemoryAll
6767
private static final int OFFSET_BITS = 51;
6868
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
6969

70-
private static CometTestShuffleMemoryAllocator INSTANCE;
71-
72-
CometTestShuffleMemoryAllocator(
70+
CometBoundedShuffleMemoryAllocator(
7371
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
7472
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
7573
this.pageSize = pageSize;

spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java

Lines changed: 13 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,91 +19,37 @@
1919

2020
package org.apache.spark.shuffle.comet;
2121

22-
import java.io.IOException;
23-
2422
import org.apache.spark.SparkConf;
25-
import org.apache.spark.memory.MemoryConsumer;
2623
import org.apache.spark.memory.MemoryMode;
2724
import org.apache.spark.memory.TaskMemoryManager;
28-
import org.apache.spark.unsafe.memory.MemoryBlock;
29-
import org.apache.spark.util.Utils;
30-
31-
import org.apache.comet.CometConf$;
3225

3326
/**
34-
* A simple memory allocator used by `CometShuffleExternalSorter` to allocate memory blocks which
35-
* store serialized rows. This class is simply an implementation of `MemoryConsumer` that delegates
36-
* memory allocation to the `TaskMemoryManager`. This requires that the `TaskMemoryManager` is
37-
* configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
27+
* An interface to instantiate either CometBoundedShuffleMemoryAllocator (on-heap mode) or
28+
* CometUnifiedShuffleMemoryAllocator (off-heap mode).
3829
*/
39-
public final class CometShuffleMemoryAllocator extends CometShuffleMemoryAllocatorTrait {
30+
public final class CometShuffleMemoryAllocator {
4031
private static CometShuffleMemoryAllocatorTrait INSTANCE;
4132

4233
/**
4334
* Returns the singleton instance of `CometShuffleMemoryAllocator`. This method should be used
4435
* instead of the constructor to ensure that only one instance of `CometShuffleMemoryAllocator` is
45-
* created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` which is a test-only
46-
* allocator that should not be used in production.
36+
* created. For on-heap mode (Spark tests), this returns `CometBoundedShuffleMemoryAllocator`.
4737
*/
4838
public static CometShuffleMemoryAllocatorTrait getInstance(
4939
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
50-
boolean isSparkTesting = Utils.isTesting();
51-
boolean useUnifiedMemAllocator =
52-
(boolean)
53-
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
54-
55-
if (!useUnifiedMemAllocator) {
56-
synchronized (CometShuffleMemoryAllocator.class) {
57-
if (INSTANCE == null) {
58-
// CometTestShuffleMemoryAllocator handles pages by itself so it can be a singleton.
59-
INSTANCE = new CometTestShuffleMemoryAllocator(conf, taskMemoryManager, pageSize);
60-
}
61-
}
62-
return INSTANCE;
63-
} else {
64-
if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
65-
throw new IllegalArgumentException(
66-
"CometShuffleMemoryAllocator should be used with off-heap "
67-
+ "memory mode, but got "
68-
+ taskMemoryManager.getTungstenMemoryMode());
69-
}
7040

41+
if (taskMemoryManager.getTungstenMemoryMode() == MemoryMode.OFF_HEAP) {
7142
// CometShuffleMemoryAllocator stores pages in TaskMemoryManager which is not singleton,
7243
// but one instance per task. So we need to create a new instance for each task.
73-
return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
44+
return new CometUnifiedShuffleMemoryAllocator(taskMemoryManager, pageSize);
7445
}
75-
}
76-
77-
CometShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long pageSize) {
78-
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
79-
}
80-
81-
public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
82-
// JVM shuffle writer does not support spilling for other memory consumers
83-
return 0;
84-
}
85-
86-
public synchronized MemoryBlock allocate(long required) {
87-
return this.allocatePage(required);
88-
}
89-
90-
public synchronized void free(MemoryBlock block) {
91-
this.freePage(block);
92-
}
9346

94-
/**
95-
* Returns the offset in the page for the given page plus base offset address. Note that this
96-
* method assumes that the page number is valid.
97-
*/
98-
public long getOffsetInPage(long pagePlusOffsetAddress) {
99-
return taskMemoryManager.getOffsetInPage(pagePlusOffsetAddress);
100-
}
101-
102-
public long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
103-
return TaskMemoryManager.encodePageNumberAndOffset(pageNumber, offsetInPage);
104-
}
105-
106-
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
107-
return encodePageNumberAndOffset(page.pageNumber, offsetInPage - page.getBaseOffset());
47+
synchronized (CometShuffleMemoryAllocator.class) {
48+
if (INSTANCE == null) {
49+
// CometBoundedShuffleMemoryAllocator handles pages by itself so it can be a singleton.
50+
INSTANCE = new CometBoundedShuffleMemoryAllocator(conf, taskMemoryManager, pageSize);
51+
}
52+
}
53+
return INSTANCE;
10854
}
10955
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.shuffle.comet;
21+
22+
import java.io.IOException;
23+
24+
import org.apache.spark.memory.MemoryConsumer;
25+
import org.apache.spark.memory.MemoryMode;
26+
import org.apache.spark.memory.TaskMemoryManager;
27+
import org.apache.spark.unsafe.memory.MemoryBlock;
28+
29+
/**
30+
* A simple memory allocator used by `CometShuffleExternalSorter` to allocate memory blocks which
31+
* store serialized rows. This class is simply an implementation of `MemoryConsumer` that delegates
32+
* memory allocation to the `TaskMemoryManager`. This requires that the `TaskMemoryManager` is
33+
* configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
34+
*
35+
* <p>If the user does not enable off-heap memory then we want to use
36+
* CometBoundedShuffleMemoryAllocator. The tests also need to default to using this because off-heap
37+
* is not enabled when running the Spark SQL tests.
38+
*/
39+
public final class CometUnifiedShuffleMemoryAllocator extends CometShuffleMemoryAllocatorTrait {
40+
41+
CometUnifiedShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long pageSize) {
42+
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
43+
if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
44+
throw new IllegalArgumentException(
45+
"CometUnifiedShuffleMemoryAllocator should be used with off-heap "
46+
+ "memory mode, but got "
47+
+ taskMemoryManager.getTungstenMemoryMode());
48+
}
49+
}
50+
51+
public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
52+
// JVM shuffle writer does not support spilling for other memory consumers
53+
return 0;
54+
}
55+
56+
public synchronized MemoryBlock allocate(long required) {
57+
return this.allocatePage(required);
58+
}
59+
60+
public synchronized void free(MemoryBlock block) {
61+
this.freePage(block);
62+
}
63+
64+
/**
65+
* Returns the offset in the page for the given page plus base offset address. Note that this
66+
* method assumes that the page number is valid.
67+
*/
68+
public long getOffsetInPage(long pagePlusOffsetAddress) {
69+
return taskMemoryManager.getOffsetInPage(pagePlusOffsetAddress);
70+
}
71+
72+
public long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
73+
return TaskMemoryManager.encodePageNumberAndOffset(pageNumber, offsetInPage);
74+
}
75+
76+
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
77+
return encodePageNumberAndOffset(page.pageNumber, offsetInPage - page.getBaseOffset());
78+
}
79+
}

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,12 +1418,6 @@ object CometSparkSessionExtensions extends Logging {
14181418
sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
14191419
}
14201420

1421-
def cometShuffleUnifiedMemoryManagerInTestEnabled(sparkConf: SparkConf): Boolean = {
1422-
sparkConf.getBoolean(
1423-
CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key,
1424-
CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.defaultValue.get)
1425-
}
1426-
14271421
/**
14281422
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
14291423
* from any child nodes. For now, we are using this to attach the reasons why certain Spark

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,17 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
101101
* unified memory manager.
102102
*/
103103
private def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
104-
conf.getBoolean(CometConf.COMET_ENABLED.key, CometConf.COMET_ENABLED.defaultValue.get) && (
105-
conf.getBoolean(
106-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
107-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get) ||
108-
conf.getBoolean(
109-
CometConf.COMET_EXEC_ENABLED.key,
110-
CometConf.COMET_EXEC_ENABLED.defaultValue.get)
111-
) && (!CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf) ||
112-
!CometSparkSessionExtensions.cometShuffleUnifiedMemoryManagerInTestEnabled(conf))
104+
val cometEnabled =
105+
conf.getBoolean(CometConf.COMET_ENABLED.key, CometConf.COMET_ENABLED.defaultValue.get)
106+
val cometExecShuffle = conf.getBoolean(
107+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
108+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get)
109+
val cometExec = conf.getBoolean(
110+
CometConf.COMET_EXEC_ENABLED.key,
111+
CometConf.COMET_EXEC_ENABLED.defaultValue.get)
112+
val unifiedMemory = CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
113+
114+
cometEnabled && (cometExecShuffle || cometExec) && !unifiedMemory
113115
}
114116
}
115117

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.comet.{CometConf, CometSparkSessionExtensions}
4040
abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper {
4141
protected val adaptiveExecutionEnabled: Boolean
4242
protected val numElementsForceSpillThreshold: Int = 10
43-
protected val useUnifiedMemoryAllocator: Boolean = true
4443

4544
override protected def sparkConf: SparkConf = {
4645
val conf = super.sparkConf
@@ -58,8 +57,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
5857
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
5958
CometConf.COMET_EXEC_ENABLED.key -> "false",
6059
CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
61-
CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key ->
62-
useUnifiedMemoryAllocator.toString,
6360
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
6461
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
6562
testFun
@@ -997,13 +994,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
997994
}
998995
}
999996

1000-
class CometTestMemoryAllocatorShuffleSuite extends CometColumnarShuffleSuite {
1001-
override protected val asyncShuffleEnable: Boolean = false
1002-
override protected val adaptiveExecutionEnabled: Boolean = true
1003-
// Explicitly test with `CometTestShuffleMemoryAllocator`
1004-
override protected val useUnifiedMemoryAllocator: Boolean = false
1005-
}
1006-
1007997
class CometAsyncShuffleSuite extends CometColumnarShuffleSuite {
1008998
override protected val asyncShuffleEnable: Boolean = true
1009999

spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,21 +161,19 @@ class CometPluginsUnifiedModeOverrideSuite extends CometTestBase {
161161
}
162162

163163
/*
164-
* Since using unified memory, but not shuffle unified memory
165-
* executor memory should be overridden by adding comet shuffle memory size
164+
* Since using unified memory executor memory should not be overridden
166165
*/
167-
test("executor memory overhead is correctly overridden") {
166+
test("executor memory overhead is not overridden") {
168167
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
169168
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
170169
val execMemOverhead3 = spark.sparkContext.getConf.get("spark.executor.memoryOverhead")
171170
val execMemOverhead4 = spark.sparkContext.conf.get("spark.executor.memoryOverhead")
172171

173-
// in unified memory mode, comet memory overhead is spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G
174-
// so the total executor memory overhead is executor memory overhead (1G) + comet memory overhead (1G) = 2G
175-
// and the overhead is overridden in MiB
176-
assert(execMemOverhead1 == "2048M")
177-
assert(execMemOverhead2 == "2048M")
178-
assert(execMemOverhead3 == "2048M")
179-
assert(execMemOverhead4 == "2048M")
172+
// in unified memory mode, comet memory overhead is
173+
// spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G and the overhead is not overridden
174+
assert(execMemOverhead1 == "1G")
175+
assert(execMemOverhead2 == "1G")
176+
assert(execMemOverhead3 == "1G")
177+
assert(execMemOverhead4 == "1G")
180178
}
181179
}

0 commit comments

Comments
 (0)