Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ object CometConf extends ShimCometConf {
"when running Spark in on-heap mode. " +
"This config is optional. If this is not specified, it will be set to " +
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
.internal()
.bytesConf(ByteUnit.MiB)
.createOptional

Expand All @@ -266,6 +267,7 @@ object CometConf extends ShimCometConf {
"Fraction of executor memory to be allocated as additional memory for Comet " +
"when running Spark in on-heap mode. " +
s"$TUNING_GUIDE.")
.internal()
.doubleConf
.checkValue(
factor => factor > 0,
Expand All @@ -275,6 +277,7 @@ object CometConf extends ShimCometConf {
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
.internal()
.bytesConf(ByteUnit.MiB)
.checkValue(
_ >= 0,
Expand Down Expand Up @@ -513,19 +516,38 @@ object CometConf extends ShimCometConf {
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
conf("spark.comet.exec.onHeap.enabled")
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
.internal()
.booleanConf
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)

val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
conf("spark.comet.exec.memoryPool")
.doc(
"The type of memory pool to be used for Comet native execution when running Spark in " +
"off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " +
s"$TUNING_GUIDE.")
.stringConf
.createWithDefault("fair_unified")

val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
"spark.comet.exec.onHeap.memoryPool")
.doc(
"The type of memory pool to be used for Comet native execution. " +
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
"'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` " +
s"for on-heap mode and `unified` for off-heap mode. $TUNING_GUIDE.")
"The type of memory pool to be used for Comet native execution " +
"when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, " +
"`greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, " +
"and `unbounded`.")
.internal()
.stringConf
.createWithDefault("default")
.createWithDefault("greedy_task_shared")

val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
conf("spark.comet.exec.memoryPool.fraction")
.doc(
"Fraction of off-heap memory pool that is available to Comet. " +
"Only applies to off-heap mode. " +
s"$TUNING_GUIDE.")
.doubleConf
.createWithDefault(1.0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is 1.0 so that this change is not a breaking change


val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
Expand Down
7 changes: 2 additions & 5 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.onHeap.enabled | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution when running Spark in off-heap mode. Available pool types are 'greedy_unified' and `fair_unified`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | fair_unified |
| spark.comet.exec.memoryPool.fraction | Fraction of off-heap memory pool that is available to Comet. Only applies to off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand All @@ -69,9 +69,6 @@ Comet provides the following configuration settings.
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true |
| spark.comet.native.shuffle.partitioning.range.enabled | Whether to enable range partitioning for Comet native shuffle. | true |
Expand Down
99 changes: 12 additions & 87 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,15 @@ than before. See the [Determining How Much Memory to Allocate] section for more

[Determining How Much Memory to Allocate]: #determining-how-much-memory-to-allocate

Comet supports Spark's on-heap (the default) and off-heap mode for allocating memory. However, we strongly recommend
using off-heap mode. Comet has some limitations when running in on-heap mode, such as requiring more memory overall,
and requiring shuffle memory to be separately configured.
### Configuring Comet Memory

### Configuring Comet Memory in Off-Heap Mode

The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows
Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is
Comet shares an off-heap memory pool with Spark. The size of the pool is
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to
[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html

### Configuring Comet Memory in On-Heap Mode

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

Comet is disabled by default in on-heap mode, but can be enabled by setting `spark.comet.exec.onHeap.enabled=true`.

When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
is not provided, it will be calculated by multiplying the current Spark executor memory by
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
Comet to operate. It is not recommended to rely on this behavior. It is better to specify `spark.comet.memoryOverhead`
explicitly.

Comet supports native shuffle and columnar shuffle (these terms are explained in the [shuffle] section below).
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[shuffle]: #shuffle

[Advanced Memory Tuning]: #advanced-memory-tuning
Expand All @@ -93,7 +67,7 @@ Baseline Spark Performance

Comet Performance

- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level
- Comet requires at least 5 GB of RAM, but performance at this level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it onheap or offheap?

is around 340 seconds, which is significantly faster than Spark with any amount of RAM
- Comet running in off-heap with 8 cores completes the benchmark in 295 seconds, more than 2x faster than Spark
- It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds,
Expand All @@ -114,79 +88,30 @@ Workarounds for this problem include:

## Advanced Memory Tuning

### Configuring Off-Heap Memory Pools
### Configuring Comet Memory Pools

Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.

The valid pool types for off-heap mode are:
The valid pool types are:

- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`

Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.

The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.
Comet's memory accounting isn't 100% accurate and this can result in Comet using more memory than it reserves,
leading to out-of-memory exceptions. To work around this issue, it is possible to
set `spark.comet.exec.memoryPool.fraction` to a value less than `1.0` to restrict the amount of memory that can be
reserved by Comet.

The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.

### Configuring On-Heap Memory Pools

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.

The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.

The valid pool types for on-heap mode are:

- `greedy`
- `greedy_global`
- `greedy_task_shared`
- `fair_spill`
- `fair_spill_global`
- `fair_spill_task_shared`
- `unbounded`

Pool types ending with `_global` use a single global memory pool between all tasks on same executor.

Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task.

Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores
and cores per task.

The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This
pool works well for queries that do not need to spill or have a single spillable operator.

The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more
than an even fraction of the available memory sans any unspillable reservations
(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`). This pool works best when you know beforehand
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
a first-come, first-serve fashion

The `unbounded` pool type uses DataFusion's [UnboundedMemoryPool], which enforces no limit. This option is useful for
development/testing purposes, where there is no room to allow spilling and rather choose to fail the job.
Spilling significantly slows down the job and this option is one way to measure the best performance scenario without
adjusting how much memory to allocate.

[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
[UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html

### Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.
The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.

## Optimizing Joins

Expand Down
6 changes: 2 additions & 4 deletions native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ pub(crate) fn parse_memory_pool_config(
let pool_size = memory_limit as usize;
let memory_pool_config = if off_heap_mode {
match memory_pool_type.as_str() {
"default" | "fair_unified" => {
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size)
}
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
"greedy_unified" => {
// the `unified` memory pool interacts with Spark's memory pool to allocate
// memory therefore does not need a size to be explicitly set. The pool size
Expand All @@ -84,7 +82,7 @@ pub(crate) fn parse_memory_pool_config(
"fair_spill_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
}
"default" | "greedy_task_shared" => {
"greedy_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task)
}
"fair_spill_global" => {
Expand Down
10 changes: 8 additions & 2 deletions native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ impl MemoryPool for CometFairMemoryPool {
if additional > 0 {
let mut state = self.state.lock();
let num = state.num;
let limit = self.pool_size.checked_div(num).unwrap();
let limit = self
.pool_size
.checked_div(num)
.expect("overflow in checked_div");
let size = reservation.size();
if limit < size + additional {
return resources_err!(
Expand All @@ -155,7 +158,10 @@ impl MemoryPool for CometFairMemoryPool {
state.used
);
}
state.used = state.used.checked_add(additional).unwrap();
state.used = state
.used
.checked_add(additional)
.expect("overflow in checked_add");
}
Ok(())
}
Expand Down
Loading
Loading