Skip to content

Commit 68d756a

Browse files
authored
feat: Various improvements to memory pool configuration, logging, and documentation (#2538)
1 parent ad0c2c7 commit 68d756a

File tree

7 files changed

+140
-180
lines changed

7 files changed

+140
-180
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ object CometConf extends ShimCometConf {
257257
"when running Spark in on-heap mode. " +
258258
"This config is optional. If this is not specified, it will be set to " +
259259
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
260+
.internal()
260261
.bytesConf(ByteUnit.MiB)
261262
.createOptional
262263

@@ -266,6 +267,7 @@ object CometConf extends ShimCometConf {
266267
"Fraction of executor memory to be allocated as additional memory for Comet " +
267268
"when running Spark in on-heap mode. " +
268269
s"$TUNING_GUIDE.")
270+
.internal()
269271
.doubleConf
270272
.checkValue(
271273
factor => factor > 0,
@@ -275,6 +277,7 @@ object CometConf extends ShimCometConf {
275277
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
276278
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
277279
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
280+
.internal()
278281
.bytesConf(ByteUnit.MiB)
279282
.checkValue(
280283
_ >= 0,
@@ -513,19 +516,38 @@ object CometConf extends ShimCometConf {
513516
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
514517
conf("spark.comet.exec.onHeap.enabled")
515518
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
519+
.internal()
516520
.booleanConf
517521
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
518522

519-
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
523+
val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
524+
conf("spark.comet.exec.memoryPool")
525+
.doc(
526+
"The type of memory pool to be used for Comet native execution when running Spark in " +
527+
"off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " +
528+
s"$TUNING_GUIDE.")
529+
.stringConf
530+
.createWithDefault("fair_unified")
531+
532+
val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
533+
"spark.comet.exec.onHeap.memoryPool")
520534
.doc(
521-
"The type of memory pool to be used for Comet native execution. " +
522-
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
523-
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
524-
"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
525-
"'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` " +
526-
s"for on-heap mode and `unified` for off-heap mode. $TUNING_GUIDE.")
535+
"The type of memory pool to be used for Comet native execution " +
536+
"when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, " +
537+
"`greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, " +
538+
"and `unbounded`.")
539+
.internal()
527540
.stringConf
528-
.createWithDefault("default")
541+
.createWithDefault("greedy_task_shared")
542+
543+
val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
544+
conf("spark.comet.exec.memoryPool.fraction")
545+
.doc(
546+
"Fraction of off-heap memory pool that is available to Comet. " +
547+
"Only applies to off-heap mode. " +
548+
s"$TUNING_GUIDE.")
549+
.doubleConf
550+
.createWithDefault(1.0)
529551

530552
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
531553
conf("spark.comet.scan.preFetch.enabled")

docs/source/user-guide/latest/configs.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ Comet provides the following configuration settings.
4949
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
5050
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
5151
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
52-
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
53-
| spark.comet.exec.onHeap.enabled | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
52+
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution when running Spark in off-heap mode. Available pool types are 'greedy_unified' and `fair_unified`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | fair_unified |
53+
| spark.comet.exec.memoryPool.fraction | Fraction of off-heap memory pool that is available to Comet. Only applies to off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
5454
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5555
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
5656
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
@@ -69,9 +69,6 @@ Comet provides the following configuration settings.
6969
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
7070
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
7171
| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b |
72-
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
73-
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
74-
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
7572
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
7673
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true |
7774
| spark.comet.native.shuffle.partitioning.range.enabled | Whether to enable range partitioning for Comet native shuffle. | true |

docs/source/user-guide/latest/tuning.md

Lines changed: 12 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -36,41 +36,15 @@ than before. See the [Determining How Much Memory to Allocate] section for more
3636

3737
[Determining How Much Memory to Allocate]: #determining-how-much-memory-to-allocate
3838

39-
Comet supports Spark's on-heap (the default) and off-heap mode for allocating memory. However, we strongly recommend
40-
using off-heap mode. Comet has some limitations when running in on-heap mode, such as requiring more memory overall,
41-
and requiring shuffle memory to be separately configured.
39+
### Configuring Comet Memory
4240

43-
### Configuring Comet Memory in Off-Heap Mode
44-
45-
The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows
46-
Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is
41+
Comet shares an off-heap memory pool with Spark. The size of the pool is
4742
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to
4843
[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning]
4944
section of this guide.
5045

5146
[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html
5247

53-
### Configuring Comet Memory in On-Heap Mode
54-
55-
```{warning}
56-
Support for on-heap memory pools is deprecated and will be removed from a future release.
57-
```
58-
59-
Comet is disabled by default in on-heap mode, but can be enabled by setting `spark.comet.exec.onHeap.enabled=true`.
60-
61-
When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
62-
is not provided, it will be calculated by multiplying the current Spark executor memory by
63-
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
64-
Comet to operate. It is not recommended to rely on this behavior. It is better to specify `spark.comet.memoryOverhead`
65-
explicitly.
66-
67-
Comet supports native shuffle and columnar shuffle (these terms are explained in the [shuffle] section below).
68-
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
69-
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
70-
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
71-
then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning]
72-
section of this guide.
73-
7448
[shuffle]: #shuffle
7549

7650
[Advanced Memory Tuning]: #advanced-memory-tuning
@@ -93,7 +67,7 @@ Baseline Spark Performance
9367

9468
Comet Performance
9569

96-
- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level
70+
- Comet requires at least 5 GB of RAM, but performance at this level
9771
is around 340 seconds, which is significantly faster than Spark with any amount of RAM
9872
- Comet running in off-heap with 8 cores completes the benchmark in 295 seconds, more than 2x faster than Spark
9973
- It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds,
@@ -114,79 +88,30 @@ Workarounds for this problem include:
11488

11589
## Advanced Memory Tuning
11690

117-
### Configuring Off-Heap Memory Pools
91+
### Configuring Comet Memory Pools
11892

11993
Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.
12094

121-
The valid pool types for off-heap mode are:
95+
The valid pool types are:
12296

12397
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
12498
- `greedy_unified`
12599

126100
Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
127101
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.
128102

129-
The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
130-
need to spill or have a single spillable operator.
103+
Comet's memory accounting isn't 100% accurate and this can result in Comet using more memory than it reserves,
104+
leading to out-of-memory exceptions. To work around this issue, it is possible to
105+
set `spark.comet.exec.memoryPool.fraction` to a value less than `1.0` to restrict the amount of memory that can be
106+
reserved by Comet.
131107

132-
The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
108+
The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
133109
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
134110
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
135111
when there is sufficient memory in order to leave enough memory for other operators.
136112

137-
### Configuring On-Heap Memory Pools
138-
139-
```{warning}
140-
Support for on-heap memory pools is deprecated and will be removed from a future release.
141-
```
142-
143-
When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.
144-
145-
The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
146-
147-
The valid pool types for on-heap mode are:
148-
149-
- `greedy`
150-
- `greedy_global`
151-
- `greedy_task_shared`
152-
- `fair_spill`
153-
- `fair_spill_global`
154-
- `fair_spill_task_shared`
155-
- `unbounded`
156-
157-
Pool types ending with `_global` use a single global memory pool between all tasks on same executor.
158-
159-
Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task.
160-
161-
Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores
162-
and cores per task.
163-
164-
The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This
165-
pool works well for queries that do not need to spill or have a single spillable operator.
166-
167-
The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more
168-
than an even fraction of the available memory sans any unspillable reservations
169-
(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`). This pool works best when you know beforehand
170-
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
171-
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
172-
a first-come, first-serve fashion
173-
174-
The `unbounded` pool type uses DataFusion's [UnboundedMemoryPool], which enforces no limit. This option is useful for
175-
development/testing purposes, where there is no room to allow spilling and rather choose to fail the job.
176-
Spilling significantly slows down the job and this option is one way to measure the best performance scenario without
177-
adjusting how much memory to allocate.
178-
179-
[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
180-
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
181-
[UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html
182-
183-
### Configuring spark.executor.memoryOverhead in On-Heap Mode
184-
185-
In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
186-
that it is possible to allocate off-heap memory when running in on-heap mode.
187-
188-
Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
189-
resource managers respect Apache Spark memory configuration before starting the containers.
113+
The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
114+
need to spill or have a single spillable operator.
190115

191116
## Optimizing Joins
192117

native/core/src/execution/memory_pools/config.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ pub(crate) fn parse_memory_pool_config(
6262
let pool_size = memory_limit as usize;
6363
let memory_pool_config = if off_heap_mode {
6464
match memory_pool_type.as_str() {
65-
"default" | "fair_unified" => {
66-
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size)
67-
}
65+
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
6866
"greedy_unified" => {
6967
// the `unified` memory pool interacts with Spark's memory pool to allocate
7068
// memory therefore does not need a size to be explicitly set. The pool size
@@ -84,7 +82,7 @@ pub(crate) fn parse_memory_pool_config(
8482
"fair_spill_task_shared" => {
8583
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
8684
}
87-
"default" | "greedy_task_shared" => {
85+
"greedy_task_shared" => {
8886
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task)
8987
}
9088
"fair_spill_global" => {

native/core/src/execution/memory_pools/fair_pool.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,10 @@ impl MemoryPool for CometFairMemoryPool {
133133
if additional > 0 {
134134
let mut state = self.state.lock();
135135
let num = state.num;
136-
let limit = self.pool_size.checked_div(num).unwrap();
136+
let limit = self
137+
.pool_size
138+
.checked_div(num)
139+
.expect("overflow in checked_div");
137140
let size = reservation.size();
138141
if limit < size + additional {
139142
return resources_err!(
@@ -155,7 +158,10 @@ impl MemoryPool for CometFairMemoryPool {
155158
state.used
156159
);
157160
}
158-
state.used = state.used.checked_add(additional).unwrap();
161+
state.used = state
162+
.used
163+
.checked_add(additional)
164+
.expect("overflow in checked_add");
159165
}
160166
Ok(())
161167
}

0 commit comments

Comments
 (0)