Skip to content

Commit 88797d3

Browse files
authored
chore: Simplify on-heap memory configuration (apache#2599)
1 parent 8810d19 commit 88797d3

File tree

16 files changed

+66
-174
lines changed

16 files changed

+66
-174
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -268,41 +268,13 @@ object CometConf extends ShimCometConf {
268268
.booleanConf
269269
.createWithDefault(false)
270270

271-
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
271+
val COMET_ONHEAP_MEMORY_OVERHEAD: ConfigEntry[Long] = conf("spark.comet.memoryOverhead")
272272
.category(CATEGORY_TESTING)
273273
.doc(
274274
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +
275-
"when running Spark in on-heap mode. " +
276-
"This config is optional. If this is not specified, it will be set to " +
277-
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
278-
.internal()
275+
"when running Spark in on-heap mode.")
279276
.bytesConf(ByteUnit.MiB)
280-
.createOptional
281-
282-
val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
283-
conf("spark.comet.memory.overhead.factor")
284-
.category(CATEGORY_TESTING)
285-
.doc(
286-
"Fraction of executor memory to be allocated as additional memory for Comet " +
287-
"when running Spark in on-heap mode. " +
288-
s"$TUNING_GUIDE.")
289-
.internal()
290-
.doubleConf
291-
.checkValue(
292-
factor => factor > 0,
293-
"Ensure that Comet memory overhead factor is a double greater than 0")
294-
.createWithDefault(0.2)
295-
296-
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
297-
.category(CATEGORY_TESTING)
298-
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
299-
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
300-
.internal()
301-
.bytesConf(ByteUnit.MiB)
302-
.checkValue(
303-
_ >= 0,
304-
"Ensure that Comet memory overhead min is a long greater than or equal to 0")
305-
.createWithDefault(384)
277+
.createWithDefault(1024)
306278

307279
val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
308280
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
@@ -425,18 +397,8 @@ object CometConf extends ShimCometConf {
425397
.intConf
426398
.createWithDefault(Int.MaxValue)
427399

428-
val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
429-
conf("spark.comet.columnar.shuffle.memorySize")
430-
.internal()
431-
.category(CATEGORY_TESTING)
432-
.doc("Amount of memory to reserve for columnar shuffle when running in on-heap mode. " +
433-
s"$TUNING_GUIDE.")
434-
.bytesConf(ByteUnit.MiB)
435-
.createOptional
436-
437-
val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
400+
val COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
438401
conf("spark.comet.columnar.shuffle.memory.factor")
439-
.internal()
440402
.category(CATEGORY_TESTING)
441403
.doc("Fraction of Comet memory to be allocated per executor process for columnar shuffle " +
442404
s"when running in on-heap mode. $TUNING_GUIDE.")
@@ -523,7 +485,6 @@ object CometConf extends ShimCometConf {
523485
.category(CATEGORY_EXEC_EXPLAIN)
524486
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
525487
"in physical optimizer rules. Default: false")
526-
.internal()
527488
.booleanConf
528489
.createWithDefault(false)
529490

@@ -558,15 +519,14 @@ object CometConf extends ShimCometConf {
558519
.booleanConf
559520
.createWithDefault(false)
560521

561-
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
522+
val COMET_ONHEAP_ENABLED: ConfigEntry[Boolean] =
562523
conf("spark.comet.exec.onHeap.enabled")
563524
.category(CATEGORY_TESTING)
564525
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
565-
.internal()
566526
.booleanConf
567527
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
568528

569-
val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
529+
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
570530
conf("spark.comet.exec.memoryPool")
571531
.category(CATEGORY_TUNING)
572532
.doc(
@@ -576,19 +536,18 @@ object CometConf extends ShimCometConf {
576536
.stringConf
577537
.createWithDefault("fair_unified")
578538

579-
val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
539+
val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
580540
"spark.comet.exec.onHeap.memoryPool")
581-
.category(CATEGORY_TUNING)
541+
.category(CATEGORY_TESTING)
582542
.doc(
583543
"The type of memory pool to be used for Comet native execution " +
584544
"when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, " +
585545
"`greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, " +
586546
"and `unbounded`.")
587-
.internal()
588547
.stringConf
589548
.createWithDefault("greedy_task_shared")
590549

591-
val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
550+
val COMET_OFFHEAP_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
592551
conf("spark.comet.exec.memoryPool.fraction")
593552
.category(CATEGORY_TUNING)
594553
.doc(

docs/source/user-guide/latest/configs.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ These settings can be used to determine which parts of the plan are accelerated
8383
| Config | Description | Default Value |
8484
|--------|-------------|---------------|
8585
| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
86+
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
8687
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
8788
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
8889
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
@@ -119,6 +120,18 @@ These settings can be used to determine which parts of the plan are accelerated
119120
| `spark.comet.tracing.enabled` | Enable fine-grained tracing of events and memory usage. For more information, refer to the [Comet Tracing Guide](https://datafusion.apache.org/comet/user-guide/tracing.html). | false |
120121
<!--END:CONFIG_TABLE-->
121122

123+
## Development & Testing Settings
124+
125+
<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
126+
<!--BEGIN:CONFIG_TABLE[testing]-->
127+
| Config | Description | Default Value |
128+
|--------|-------------|---------------|
129+
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
130+
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
131+
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
132+
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
133+
<!--END:CONFIG_TABLE-->
134+
122135
## Enabling or Disabling Individual Operators
123136

124137
<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,10 @@ object CometExecIterator extends Logging {
273273
if (offHeapMode) {
274274
// in off-heap mode, Comet uses unified memory management to share off-heap memory with Spark
275275
val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
276-
val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
276+
val memoryFraction = CometConf.COMET_OFFHEAP_MEMORY_POOL_FRACTION.get()
277277
val memoryLimit = (offHeapSize * memoryFraction).toLong
278278
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
279-
val memoryPoolType = COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE.get()
279+
val memoryPoolType = COMET_OFFHEAP_MEMORY_POOL_TYPE.get()
280280
logInfo(
281281
s"memoryPoolType=$memoryPoolType, " +
282282
s"offHeapSize=${toMB(offHeapSize)}, " +
@@ -291,7 +291,7 @@ object CometExecIterator extends Logging {
291291
// example 16GB maxMemory * 16 cores with 4 cores per task results
292292
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
293293
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
294-
val memoryPoolType = COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE.get()
294+
val memoryPoolType = COMET_ONHEAP_MEMORY_POOL_TYPE.get()
295295
logInfo(
296296
s"memoryPoolType=$memoryPoolType, " +
297297
s"memoryLimit=${toMB(memoryLimit)}, " +

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -243,46 +243,25 @@ object CometSparkSessionExtensions extends Logging {
243243
}
244244

245245
/**
246-
* Calculates required memory overhead in MB per executor process for Comet when running in
246+
* Determines required memory overhead in MB per executor process for Comet when running in
247247
* on-heap mode.
248-
*
249-
* If `COMET_MEMORY_OVERHEAD` is defined then that value will be used, otherwise the overhead
250-
* will be calculated by multiplying executor memory (`spark.executor.memory`) by
251-
* `COMET_MEMORY_OVERHEAD_FACTOR`.
252-
*
253-
* In either case, a minimum value of `COMET_MEMORY_OVERHEAD_MIN_MIB` will be returned.
254248
*/
255249
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
256250
if (isOffHeapEnabled(sparkConf)) {
257251
// when running in off-heap mode we use unified memory management to share
258252
// off-heap memory with Spark so do not add overhead
259253
return 0
260254
}
261-
262-
// `spark.executor.memory` default value is 1g
263-
val baseMemoryMiB = ConfigHelpers
264-
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)
265-
266-
val cometMemoryOverheadMinAsString = sparkConf.get(
267-
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
268-
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString)
269-
270-
val minimum = ConfigHelpers.byteFromString(cometMemoryOverheadMinAsString, ByteUnit.MiB)
271-
val overheadFactor = getDoubleConf(sparkConf, COMET_MEMORY_OVERHEAD_FACTOR)
272-
273-
val overHeadMemFromConf = sparkConf
274-
.getOption(COMET_MEMORY_OVERHEAD.key)
275-
.map(ConfigHelpers.byteFromString(_, ByteUnit.MiB))
276-
277-
overHeadMemFromConf.getOrElse(math.max((overheadFactor * baseMemoryMiB).toLong, minimum))
255+
ConfigHelpers.byteFromString(
256+
sparkConf.get(
257+
COMET_ONHEAP_MEMORY_OVERHEAD.key,
258+
COMET_ONHEAP_MEMORY_OVERHEAD.defaultValueString),
259+
ByteUnit.MiB)
278260
}
279261

280262
private def getBooleanConf(conf: SparkConf, entry: ConfigEntry[Boolean]) =
281263
conf.getBoolean(entry.key, entry.defaultValue.get)
282264

283-
private def getDoubleConf(conf: SparkConf, entry: ConfigEntry[Double]) =
284-
conf.getDouble(entry.key, entry.defaultValue.get)
285-
286265
/**
287266
* Calculates required memory overhead in bytes per executor process for Comet when running in
288267
* on-heap mode.
@@ -300,11 +279,9 @@ object CometSparkSessionExtensions extends Logging {
300279

301280
val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)
302281

303-
val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
304-
val cometShuffleMemoryFromConf = COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.get(conf)
282+
val overheadFactor = COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR.get(conf)
305283

306-
val shuffleMemorySize =
307-
cometShuffleMemoryFromConf.getOrElse((overheadFactor * cometMemoryOverhead).toLong)
284+
val shuffleMemorySize = (overheadFactor * cometMemoryOverhead).toLong
308285
if (shuffleMemorySize > cometMemoryOverhead) {
309286
logWarning(
310287
s"Configured shuffle memory size $shuffleMemorySize is larger than Comet memory overhead " +

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
2525

2626
import org.apache.spark.sql.catalyst.expressions.Cast
2727

28+
import org.apache.comet.CometConf.COMET_ONHEAP_MEMORY_OVERHEAD
2829
import org.apache.comet.expressions.{CometCast, CometEvalMode}
2930
import org.apache.comet.serde.{Compatible, Incompatible, QueryPlanSerde}
3031

@@ -78,7 +79,13 @@ object GenerateDocs {
7879
if (conf.defaultValue.isEmpty) {
7980
w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
8081
} else {
81-
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
82+
val isBytesConf = conf.key == COMET_ONHEAP_MEMORY_OVERHEAD.key
83+
if (isBytesConf) {
84+
val bytes = conf.defaultValue.get.asInstanceOf[Long]
85+
w.write(s"| `${conf.key}` | $doc | $bytes MiB |\n".getBytes)
86+
} else {
87+
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
88+
}
8289
}
8390
}
8491
}

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
2929
import org.apache.spark.sql.internal.StaticSQLConf
3030

31-
import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE
31+
import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
3232
import org.apache.comet.CometSparkSessionExtensions
3333

3434
/**
@@ -49,7 +49,7 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
4949
logInfo("CometDriverPlugin init")
5050

5151
if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) &&
52-
!sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) {
52+
!sc.getConf.getBoolean(COMET_ONHEAP_ENABLED.key, false)) {
5353
logWarning("Comet plugin is disabled because Spark is not running in off-heap mode.")
5454
return Collections.emptyMap[String, String]
5555
}

spark/src/test/scala/org/apache/comet/CometSparkSessionExtensionsSuite.scala

Lines changed: 6 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -76,47 +76,21 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
7676

7777
def getBytesFromMib(mib: Long): Long = mib * 1024 * 1024
7878

79-
test("Minimum Comet memory overhead") {
79+
test("Default Comet memory overhead") {
8080
val conf = new SparkConf()
81-
assert(getCometMemoryOverhead(conf) == getBytesFromMib(384))
82-
}
83-
84-
test("Comet memory overhead factor with executor memory") {
85-
val sparkConf = new SparkConf()
86-
sparkConf.set("spark.executor.memory", "16g")
87-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
88-
89-
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(8 * 1024))
90-
}
91-
92-
test("Comet memory overhead factor with default executor memory") {
93-
val sparkConf = new SparkConf()
94-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
95-
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(512))
81+
assert(getCometMemoryOverhead(conf) == getBytesFromMib(1024))
9682
}
9783

9884
test("Comet memory overhead") {
9985
val sparkConf = new SparkConf()
100-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "10g")
86+
sparkConf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "10g")
10187
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(1024 * 10))
10288
assert(shouldOverrideMemoryConf(sparkConf))
10389
}
10490

105-
test("Comet memory overhead (min)") {
106-
val sparkConf = new SparkConf()
107-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_MIN_MIB.key, "2g")
108-
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(1024 * 2))
109-
}
110-
111-
test("Comet memory overhead (factor)") {
112-
val sparkConf = new SparkConf()
113-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
114-
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(512))
115-
}
116-
11791
test("Comet memory overhead (off heap)") {
11892
val sparkConf = new SparkConf()
119-
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "64g")
93+
sparkConf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "64g")
12094
sparkConf.set("spark.memory.offHeap.enabled", "true")
12195
sparkConf.set("spark.memory.offHeap.size", "10g")
12296
assert(getCometMemoryOverhead(sparkConf) == 0)
@@ -127,46 +101,11 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
127101
val conf = new SparkConf()
128102

129103
val sqlConf = new SQLConf
130-
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "0.2")
104+
sqlConf.setConfString(CometConf.COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR.key, "0.2")
131105

132106
// Minimum Comet memory overhead is 384MB
133107
assert(
134108
getCometShuffleMemorySize(conf, sqlConf) ==
135-
getBytesFromMib((384 * 0.2).toLong))
136-
137-
conf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
138-
assert(
139-
getCometShuffleMemorySize(conf, sqlConf) ==
140-
getBytesFromMib((1024 * 0.5 * 0.2).toLong))
141-
}
142-
143-
test("Comet shuffle memory") {
144-
val conf = new SparkConf()
145-
val sqlConf = new SQLConf
146-
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
147-
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "512m")
148-
149-
assert(getCometShuffleMemorySize(conf, sqlConf) == getBytesFromMib(512))
150-
}
151-
152-
test("Comet shuffle memory (off-heap)") {
153-
val conf = new SparkConf()
154-
val sqlConf = new SQLConf
155-
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
156-
conf.set("spark.memory.offHeap.enabled", "true")
157-
conf.set("spark.memory.offHeap.size", "10g")
158-
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "512m")
159-
160-
assertThrows[AssertionError] {
161-
getCometShuffleMemorySize(conf, sqlConf)
162-
}
163-
}
164-
165-
test("Comet shuffle memory cannot be larger than Comet memory overhead") {
166-
val conf = new SparkConf()
167-
val sqlConf = new SQLConf
168-
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
169-
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "10g")
170-
assert(getCometShuffleMemorySize(conf, sqlConf) == getBytesFromMib(1024))
109+
getBytesFromMib((1024 * 0.2).toLong))
171110
}
172111
}

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
6161
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
6262
CometConf.COMET_EXEC_ENABLED.key -> "false",
6363
CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
64-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
65-
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
64+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
6665
testFun
6766
}
6867
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ class CometExecSuite extends CometTestBase {
12511251
}
12521252

12531253
test("spill sort with (multiple) dictionaries") {
1254-
withSQLConf(CometConf.COMET_MEMORY_OVERHEAD.key -> "15MB") {
1254+
withSQLConf(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "15MB") {
12551255
withTempDir { dir =>
12561256
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
12571257
makeRawTimeParquetFileColumns(path, dictionaryEnabled = true, n = 1000, rowGroupSize = 10)
@@ -1270,7 +1270,7 @@ class CometExecSuite extends CometTestBase {
12701270
}
12711271

12721272
test("spill sort with (multiple) dictionaries on mixed columns") {
1273-
withSQLConf(CometConf.COMET_MEMORY_OVERHEAD.key -> "15MB") {
1273+
withSQLConf(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "15MB") {
12741274
withTempDir { dir =>
12751275
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
12761276
makeRawTimeParquetFile(path, dictionaryEnabled = true, n = 1000, rowGroupSize = 10)

0 commit comments

Comments
 (0)