Skip to content

Commit badbd37

Browse files
authored
chore: Fix some inconsistencies in memory pool configuration (#1561)
## Which issue does this PR close? Closes #1560 ## Rationale for this change - Fix some mistakes I made in #1525 - Make some changes to `fair_unified` pool memory calculations
1 parent 98a9ca2 commit badbd37

File tree

8 files changed

+147
-105
lines changed

8 files changed

+147
-105
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -236,17 +236,18 @@ object CometConf extends ShimCometConf {
236236
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
237237
.doc(
238238
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +
239-
"when running in on-heap mode or when using the `fair_unified` pool in off-heap mode. " +
239+
"when running Spark in on-heap mode. " +
240240
"This config is optional. If this is not specified, it will be set to " +
241241
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
242242
.bytesConf(ByteUnit.MiB)
243243
.createOptional
244244

245245
val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
246246
conf("spark.comet.memory.overhead.factor")
247-
.doc("Fraction of executor memory to be allocated as additional memory for Comet " +
248-
"when running in on-heap mode or when using the `fair_unified` pool in off-heap mode. " +
249-
s"$TUNING_GUIDE.")
247+
.doc(
248+
"Fraction of executor memory to be allocated as additional memory for Comet " +
249+
"when running Spark in on-heap mode. " +
250+
s"$TUNING_GUIDE.")
250251
.doubleConf
251252
.checkValue(
252253
factor => factor > 0,
@@ -255,8 +256,7 @@ object CometConf extends ShimCometConf {
255256

256257
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
257258
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
258-
"in MiB, when running in on-heap mode or when using the `fair_unified` pool in off-heap " +
259-
s"mode. $TUNING_GUIDE.")
259+
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
260260
.bytesConf(ByteUnit.MiB)
261261
.checkValue(
262262
_ >= 0,
@@ -485,13 +485,14 @@ object CometConf extends ShimCometConf {
485485
.createWithDefault(false)
486486

487487
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
488-
.doc(
489-
"The type of memory pool to be used for Comet native execution. " +
490-
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
491-
"'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. " +
492-
"For off-heap types are 'unified' and `fair_unified`.")
488+
.doc("The type of memory pool to be used for Comet native execution. " +
489+
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
490+
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
491+
"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
492+
"'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap " +
493+
s"mode and `unified` for off-heap mode. $TUNING_GUIDE.")
493494
.stringConf
494-
.createWithDefault("greedy_task_shared")
495+
.createWithDefault("default")
495496

496497
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
497498
conf("spark.comet.scan.preFetch.enabled")

docs/source/user-guide/configs.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Comet provides the following configuration settings.
5454
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
5555
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
5656
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
57-
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared |
57+
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
5858
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5959
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
6060
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
@@ -71,9 +71,9 @@ Comet provides the following configuration settings.
7171
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
7272
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
7373
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
74-
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running in on-heap mode or when using the `fair_unified` pool in off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
75-
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running in on-heap mode or when using the `fair_unified` pool in off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
76-
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running in on-heap mode or when using the `fair_unified` pool in off-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
74+
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
75+
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
76+
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
7777
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
7878
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
7979
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |

docs/source/user-guide/tuning.md

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,34 +108,29 @@ resource managers respect Apache Spark memory configuration before starting the
108108

109109
Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.
110110

111-
The valid pool types are:
111+
The valid pool types for off-heap mode are:
112112

113113
- `unified` (default when `spark.memory.offHeap.enabled=true` is set)
114114
- `fair_unified`
115115

116+
Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
117+
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.
118+
116119
The `unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
117-
need to spill or have a single spillable operator. The size of the pool is specified by `spark.memory.offHeap.size`
118-
and the pool interacts with Spark's memory pool, effectively sharing the off-heap memory between Spark and Comet. This
119-
approach is sometimes referred to as unified memory management.
120+
need to spill or have a single spillable operator.
120121

121122
The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
122123
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
123124
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
124125
when there is sufficient memory in order to leave enough memory for other operators.
125126

126-
The pool size configuration for the `fair_unified` pool, is a little more complex. The total pool size is computed by
127-
multiplying `spark.memory.offHeap.size` by `spark.comet.memory.overhead.factor` with the minimum amount being
128-
`spark.comet.memory.overhead.min`. It is also possible to manually specify `spark.comet.memoryOverhead` instead to
129-
override this default behavior. Note that the `fair_unified` pool does not use unified memory management to interact
130-
with Spark's memory pools, which is why the allocation defaults to a fraction of off-heap memory.
131-
132127
### Configuring On-Heap Memory Pools
133128

134129
When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.
135130

136131
The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
137132

138-
The valid pool types are:
133+
The valid pool types for on-heap mode are:
139134

140135
- `greedy`
141136
- `greedy_global`

native/core/src/execution/jni_api.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,18 @@ fn parse_memory_pool_config(
302302
let memory_pool_config = if off_heap_mode {
303303
match memory_pool_type.as_str() {
304304
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
305-
_ => {
306-
// the Unified memory pool interacts with Spark's memory pool to allocate
305+
"default" | "unified" => {
306+
// the `unified` memory pool interacts with Spark's memory pool to allocate
307307
// memory therefore does not need a size to be explicitly set. The pool size
308308
// shared with Spark is set by `spark.memory.offHeap.size`.
309309
MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
310310
}
311+
_ => {
312+
return Err(CometError::Config(format!(
313+
"Unsupported memory pool type for off-heap mode: {}",
314+
memory_pool_type
315+
)))
316+
}
311317
}
312318
} else {
313319
// Use the memory pool from DF
@@ -316,7 +322,7 @@ fn parse_memory_pool_config(
316322
"fair_spill_task_shared" => {
317323
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
318324
}
319-
"greedy_task_shared" => {
325+
"default" | "greedy_task_shared" => {
320326
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task)
321327
}
322328
"fair_spill_global" => {
@@ -328,7 +334,7 @@ fn parse_memory_pool_config(
328334
"unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0),
329335
_ => {
330336
return Err(CometError::Config(format!(
331-
"Unsupported memory pool type: {}",
337+
"Unsupported memory pool type for on-heap mode: {}",
332338
memory_pool_type
333339
)))
334340
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.comet
2121

2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
24+
import org.apache.spark.network.util.ByteUnit
2425
import org.apache.spark.sql.comet.CometMetricNode
2526
import org.apache.spark.sql.vectorized._
2627

@@ -63,9 +64,17 @@ class CometExecIterator(
6364
}.toArray
6465
private val plan = {
6566
val conf = SparkEnv.get.conf
66-
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
67-
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
68-
// and `memory_fraction` below.
67+
68+
val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
69+
val memoryLimit = if (offHeapMode) {
70+
// in unified mode we share off-heap memory with Spark
71+
ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
72+
} else {
73+
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
74+
// and `memory_fraction` below.
75+
CometSparkSessionExtensions.getCometMemoryOverhead(conf)
76+
}
77+
6978
nativeLib.createPlan(
7079
id,
7180
cometBatchIterators,
@@ -75,9 +84,9 @@ class CometExecIterator(
7584
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
7685
new CometTaskMemoryManager(id),
7786
batchSize = COMET_BATCH_SIZE.get(),
78-
offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf),
87+
offHeapMode,
7988
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
80-
memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
89+
memoryLimit,
8190
memoryLimitPerTask = getMemoryLimitPerTask(conf),
8291
taskAttemptId = TaskContext.get().taskAttemptId,
8392
debug = COMET_DEBUG_ENABLED.get(),

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1334,26 +1334,46 @@ object CometSparkSessionExtensions extends Logging {
13341334
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION
13351335
}
13361336

1337-
/** Calculates required memory overhead in MB per executor process for Comet. */
1337+
/**
1338+
* Whether we should override Spark memory configuration for Comet. This only returns true when
1339+
* Comet native execution is enabled and/or Comet shuffle is enabled and Comet doesn't use
1340+
* off-heap mode (unified memory manager).
1341+
*/
1342+
def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
1343+
val cometEnabled = getBooleanConf(conf, CometConf.COMET_ENABLED)
1344+
val cometShuffleEnabled = getBooleanConf(conf, CometConf.COMET_EXEC_SHUFFLE_ENABLED)
1345+
val cometExecEnabled = getBooleanConf(conf, CometConf.COMET_EXEC_ENABLED)
1346+
val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
1347+
cometEnabled && (cometShuffleEnabled || cometExecEnabled) && !offHeapMode
1348+
}
1349+
1350+
/**
1351+
* Calculates required memory overhead in MB per executor process for Comet when running in
1352+
* on-heap mode.
1353+
*
1354+
* If `COMET_MEMORY_OVERHEAD` is defined then that value will be used, otherwise the overhead
1355+
* will be calculated by multiplying executor memory (`spark.executor.memory`) by
1356+
* `COMET_MEMORY_OVERHEAD_FACTOR`.
1357+
*
1358+
* In either case, a minimum value of `COMET_MEMORY_OVERHEAD_MIN_MIB` will be returned.
1359+
*/
13381360
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
1339-
val baseMemoryMiB = if (isOffHeapEnabled(sparkConf)) {
1340-
ConfigHelpers
1341-
.byteFromString(sparkConf.get("spark.memory.offHeap.size"), ByteUnit.MiB)
1342-
} else {
1343-
// `spark.executor.memory` default value is 1g
1344-
ConfigHelpers
1345-
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)
1361+
if (isOffHeapEnabled(sparkConf)) {
1362+
// when running in off-heap mode we use unified memory management to share
1363+
// off-heap memory with Spark so do not add overhead
1364+
return 0
13461365
}
13471366

1348-
val minimum = ConfigHelpers
1349-
.byteFromString(
1350-
sparkConf.get(
1351-
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
1352-
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString),
1353-
ByteUnit.MiB)
1354-
val overheadFactor = sparkConf.getDouble(
1355-
COMET_MEMORY_OVERHEAD_FACTOR.key,
1356-
COMET_MEMORY_OVERHEAD_FACTOR.defaultValue.get)
1367+
// `spark.executor.memory` default value is 1g
1368+
val baseMemoryMiB = ConfigHelpers
1369+
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)
1370+
1371+
val cometMemoryOverheadMinAsString = sparkConf.get(
1372+
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
1373+
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString)
1374+
1375+
val minimum = ConfigHelpers.byteFromString(cometMemoryOverheadMinAsString, ByteUnit.MiB)
1376+
val overheadFactor = getDoubleConf(sparkConf, COMET_MEMORY_OVERHEAD_FACTOR)
13571377

13581378
val overHeadMemFromConf = sparkConf
13591379
.getOption(COMET_MEMORY_OVERHEAD.key)
@@ -1362,7 +1382,16 @@ object CometSparkSessionExtensions extends Logging {
13621382
overHeadMemFromConf.getOrElse(math.max((overheadFactor * baseMemoryMiB).toLong, minimum))
13631383
}
13641384

1365-
/** Calculates required memory overhead in bytes per executor process for Comet. */
1385+
private def getBooleanConf(conf: SparkConf, entry: ConfigEntry[Boolean]) =
1386+
conf.getBoolean(entry.key, entry.defaultValue.get)
1387+
1388+
private def getDoubleConf(conf: SparkConf, entry: ConfigEntry[Double]) =
1389+
conf.getDouble(entry.key, entry.defaultValue.get)
1390+
1391+
/**
1392+
* Calculates required memory overhead in bytes per executor process for Comet when running in
1393+
* on-heap mode.
1394+
*/
13661395
def getCometMemoryOverhead(sparkConf: SparkConf): Long = {
13671396
ByteUnit.MiB.toBytes(getCometMemoryOverheadInMiB(sparkConf))
13681397
}
@@ -1391,11 +1420,6 @@ object CometSparkSessionExtensions extends Logging {
13911420
}
13921421
}
13931422

1394-
/** Calculates Comet shuffle memory size in MB */
1395-
def getCometShuffleMemorySizeInMiB(sparkConf: SparkConf, conf: SQLConf = SQLConf.get): Long = {
1396-
ByteUnit.BYTE.toMiB(getCometShuffleMemorySize(sparkConf, conf))
1397-
}
1398-
13991423
def isOffHeapEnabled(sparkConf: SparkConf): Boolean = {
14001424
sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
14011425
}

0 commit comments

Comments
 (0)