Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
run: |
cd apache-iceberg
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
:iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
-Pquick=true -x javadoc

Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
run: |
cd apache-iceberg
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
:iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
-Pquick=true -x javadoc

Expand Down Expand Up @@ -153,6 +153,6 @@ jobs:
run: |
cd apache-iceberg
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
:iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:integrationTest \
-Pquick=true -x javadoc
6 changes: 3 additions & 3 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
Expand Down Expand Up @@ -130,7 +130,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
Expand Down Expand Up @@ -179,7 +179,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test_ansi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test_native_datafusion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test_native_iceberg_compat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
conf("spark.comet.exec.onHeap.enabled")
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
.booleanConf
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)

val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
.doc(
"The type of memory pool to be used for Comet native execution. " +
Expand Down
14 changes: 7 additions & 7 deletions docs/source/contributor-guide/spark-sql-tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ git apply ../datafusion-comet/dev/diffs/3.4.3.diff
### Use the following commands to run the Spark SQL test suite locally.

```shell
ENABLE_COMET=true build/sbt catalyst/test
ENABLE_COMET=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest"
ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"
ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"
ENABLE_COMET=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"
ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"
ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt catalyst/test
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"
```
### Steps to run individual test suites through SBT
1. Open SBT with Comet enabled
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.onHeap.enabled | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand Down
36 changes: 26 additions & 10 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,19 @@ and requiring shuffle memory to be separately configured.
The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows
Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to
Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html

### Configuring Comet Memory in On-Heap Mode

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

Comet is disabled by default in on-heap mode, but can be enabled by setting `spark.comet.exec.onHeap.enabled=true`.

When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
is not provided, it will be calculated by multiplying the current Spark executor memory by
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
Expand All @@ -59,10 +68,13 @@ Comet supports native shuffle and columnar shuffle (these terms are explained in
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
then the query will fail.
then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[shuffle]: #shuffle

[Advanced Memory Tuning]: #advanced-memory-tuning

### Determining How Much Memory to Allocate

Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the
Expand Down Expand Up @@ -102,14 +114,6 @@ Workarounds for this problem include:

## Advanced Memory Tuning

### Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.

### Configuring Off-Heap Memory Pools

Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.
Expand All @@ -132,6 +136,10 @@ when there is sufficient memory in order to leave enough memory for other operat

### Configuring On-Heap Memory Pools

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.

The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
Expand Down Expand Up @@ -172,6 +180,14 @@ adjusting how much memory to allocate.
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
[UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html

### Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.

## Optimizing Joins

Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a
Expand Down
12 changes: 11 additions & 1 deletion spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.CometSparkSessionExtensions
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE

/**
* Comet driver plugin. This class is loaded by Spark's plugin framework. It will be instantiated
Expand All @@ -47,6 +48,15 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
override def init(sc: SparkContext, pluginContext: PluginContext): ju.Map[String, String] = {
logInfo("CometDriverPlugin init")

if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) &&
!sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) {
logWarning(
"Comet plugin is disabled because Spark is not running in off-heap mode. Set " +
s"${CometConf.COMET_ENABLE_ONHEAP_MODE.key}=true to override this restriction " +
"(for testing purposes only).")
return Collections.emptyMap[String, String]
}

// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

Expand Down
3 changes: 3 additions & 0 deletions spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CometPluginsSuite extends CometTestBase {
conf.set("spark.plugins", "org.apache.spark.CometPlugin")
conf.set("spark.comet.enabled", "true")
conf.set("spark.comet.exec.enabled", "true")
conf.set("spark.comet.exec.onHeap.enabled", "true")
conf
}

Expand Down Expand Up @@ -99,6 +100,7 @@ class CometPluginsDefaultSuite extends CometTestBase {
conf.set("spark.plugins", "org.apache.spark.CometPlugin")
conf.set("spark.comet.enabled", "true")
conf.set("spark.comet.exec.shuffle.enabled", "true")
conf.set("spark.comet.exec.onHeap.enabled", "true")
conf
}

Expand Down Expand Up @@ -128,6 +130,7 @@ class CometPluginsNonOverrideSuite extends CometTestBase {
conf.set("spark.comet.enabled", "true")
conf.set("spark.comet.exec.shuffle.enabled", "false")
conf.set("spark.comet.exec.enabled", "false")
conf.set("spark.comet.exec.onHeap.enabled", "true")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ abstract class CometTestBase
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_ENABLE_ONHEAP_MODE.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
Expand Down
Loading