Skip to content

Commit c214049

Browse files
authored
feat: Add config to enable running Comet in onheap mode (#2554)
1 parent c27a426 commit c214049

File tree

12 files changed

+64
-27
lines changed

12 files changed

+64
-27
lines changed

.github/workflows/iceberg_spark_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
run: |
7676
cd apache-iceberg
7777
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
78-
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
78+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
7979
:iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
8080
-Pquick=true -x javadoc
8181
@@ -114,7 +114,7 @@ jobs:
114114
run: |
115115
cd apache-iceberg
116116
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
117-
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
117+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
118118
:iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
119119
-Pquick=true -x javadoc
120120
@@ -153,6 +153,6 @@ jobs:
153153
run: |
154154
cd apache-iceberg
155155
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
156-
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
156+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
157157
:iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:integrationTest \
158158
-Pquick=true -x javadoc

.github/workflows/spark_sql_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ jobs:
8181
run: |
8282
cd apache-spark
8383
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
84-
ENABLE_COMET=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
84+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
8585
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
8686
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
8787
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
@@ -130,7 +130,7 @@ jobs:
130130
run: |
131131
cd apache-spark
132132
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
133-
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
133+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
134134
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
135135
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
136136
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
@@ -179,7 +179,7 @@ jobs:
179179
run: |
180180
cd apache-spark
181181
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
182-
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
182+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
183183
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
184184
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
185185
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log

.github/workflows/spark_sql_test_ansi.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ jobs:
7373
run: |
7474
cd apache-spark
7575
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
76-
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
76+
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
7777
env:
7878
LC_ALL: "C.UTF-8"
7979

.github/workflows/spark_sql_test_native_datafusion.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
run: |
6666
cd apache-spark
6767
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
68-
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
68+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
6969
env:
7070
LC_ALL: "C.UTF-8"
7171

.github/workflows/spark_sql_test_native_iceberg_compat.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
run: |
6666
cd apache-spark
6767
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
68-
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
68+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
6969
env:
7070
LC_ALL: "C.UTF-8"
7171

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,12 @@ object CometConf extends ShimCometConf {
510510
.booleanConf
511511
.createWithDefault(false)
512512

513+
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
514+
conf("spark.comet.exec.onHeap.enabled")
515+
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
516+
.booleanConf
517+
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
518+
513519
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
514520
.doc(
515521
"The type of memory pool to be used for Comet native execution. " +

docs/source/contributor-guide/spark-sql-tests.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ git apply ../datafusion-comet/dev/diffs/3.4.3.diff
5757
### Use the following commands to run the Spark SQL test suite locally.
5858

5959
```shell
60-
ENABLE_COMET=true build/sbt catalyst/test
61-
ENABLE_COMET=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest"
62-
ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"
63-
ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"
64-
ENABLE_COMET=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"
65-
ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"
66-
ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"
60+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt catalyst/test
61+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest"
62+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"
63+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"
64+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"
65+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"
66+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"
6767
```
6868
### Steps to run individual test suites through SBT
6969
1. Open SBT with Comet enabled

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Comet provides the following configuration settings.
5050
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
5151
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
5252
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
53+
| spark.comet.exec.onHeap.enabled | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
5354
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5455
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
5556
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |

docs/source/user-guide/latest/tuning.md

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,19 @@ and requiring shuffle memory to be separately configured.
4545
The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows
4646
Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is
4747
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to
48-
Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
48+
[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning]
49+
section of this guide.
50+
51+
[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html
4952

5053
### Configuring Comet Memory in On-Heap Mode
5154

55+
```{warning}
56+
Support for on-heap memory pools is deprecated and will be removed from a future release.
57+
```
58+
59+
Comet is disabled by default in on-heap mode, but can be enabled by setting `spark.comet.exec.onHeap.enabled=true`.
60+
5261
When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
5362
is not provided, it will be calculated by multiplying the current Spark executor memory by
5463
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
@@ -59,10 +68,13 @@ Comet supports native shuffle and columnar shuffle (these terms are explained in
5968
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
6069
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
6170
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
62-
then the query will fail.
71+
then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning]
72+
section of this guide.
6373

6474
[shuffle]: #shuffle
6575

76+
[Advanced Memory Tuning]: #advanced-memory-tuning
77+
6678
### Determining How Much Memory to Allocate
6779

6880
Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the
@@ -102,14 +114,6 @@ Workarounds for this problem include:
102114

103115
## Advanced Memory Tuning
104116

105-
### Configuring spark.executor.memoryOverhead in On-Heap Mode
106-
107-
In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
108-
that it is possible to allocate off-heap memory when running in on-heap mode.
109-
110-
Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
111-
resource managers respect Apache Spark memory configuration before starting the containers.
112-
113117
### Configuring Off-Heap Memory Pools
114118

115119
Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.
@@ -132,6 +136,10 @@ when there is sufficient memory in order to leave enough memory for other operat
132136

133137
### Configuring On-Heap Memory Pools
134138

139+
```{warning}
140+
Support for on-heap memory pools is deprecated and will be removed from a future release.
141+
```
142+
135143
When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.
136144

137145
The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
@@ -172,6 +180,14 @@ adjusting how much memory to allocate.
172180
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
173181
[UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html
174182

183+
### Configuring spark.executor.memoryOverhead in On-Heap Mode
184+
185+
In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
186+
that it is possible to allocate off-heap memory when running in on-heap mode.
187+
188+
Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
189+
resource managers respect Apache Spark memory configuration before starting the containers.
190+
175191
## Optimizing Joins
176192

177193
Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
2929
import org.apache.spark.sql.internal.StaticSQLConf
3030

31-
import org.apache.comet.CometSparkSessionExtensions
31+
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
32+
import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE
3233

3334
/**
3435
* Comet driver plugin. This class is loaded by Spark's plugin framework. It will be instantiated
@@ -47,6 +48,15 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
4748
override def init(sc: SparkContext, pluginContext: PluginContext): ju.Map[String, String] = {
4849
logInfo("CometDriverPlugin init")
4950

51+
if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) &&
52+
!sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) {
53+
logWarning(
54+
"Comet plugin is disabled because Spark is not running in off-heap mode. Set " +
55+
s"${CometConf.COMET_ENABLE_ONHEAP_MODE.key}=true to override this restriction " +
56+
"(for testing purposes only).")
57+
return Collections.emptyMap[String, String]
58+
}
59+
5060
// register CometSparkSessionExtensions if it isn't already registered
5161
CometDriverPlugin.registerCometSessionExtension(sc.conf)
5262

0 commit comments

Comments
 (0)