diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index e607daf4f1..aa59d2ff74 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -75,7 +75,7 @@ jobs: run: | cd apache-iceberg rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ :iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \ -Pquick=true -x javadoc @@ -114,7 +114,7 @@ jobs: run: | cd apache-iceberg rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \ -Pquick=true -x javadoc @@ -153,6 +153,6 @@ jobs: run: | cd apache-iceberg rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:integrationTest \ -Pquick=true -x javadoc \ No newline at end of file diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 824a25ad6c..113fb3072c 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -81,7 +81,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log @@ -130,7 +130,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_comet ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log @@ -179,7 +179,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \ build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log diff --git a/.github/workflows/spark_sql_test_ansi.yml b/.github/workflows/spark_sql_test_ansi.yml index 0f579ef3fb..b97cd78419 100644 --- a/.github/workflows/spark_sql_test_ansi.yml +++ b/.github/workflows/spark_sql_test_ansi.yml @@ -73,7 +73,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" + RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_ANSI_MODE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" env: LC_ALL: "C.UTF-8" diff --git a/.github/workflows/spark_sql_test_native_datafusion.yml b/.github/workflows/spark_sql_test_native_datafusion.yml index cc786f0d50..3c20545916 100644 --- a/.github/workflows/spark_sql_test_native_datafusion.yml +++ b/.github/workflows/spark_sql_test_native_datafusion.yml @@ -65,7 +65,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_datafusion build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" env: LC_ALL: "C.UTF-8" diff --git a/.github/workflows/spark_sql_test_native_iceberg_compat.yml b/.github/workflows/spark_sql_test_native_iceberg_compat.yml index 7a9322bc2d..7457e9ae35 100644 --- a/.github/workflows/spark_sql_test_native_iceberg_compat.yml +++ b/.github/workflows/spark_sql_test_native_iceberg_compat.yml @@ -65,7 +65,7 @@ jobs: run: | cd apache-spark rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups - ENABLE_COMET=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" + ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=native_iceberg_compat build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" env: LC_ALL: "C.UTF-8" diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 05c0750571..bf86846625 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -510,6 +510,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] = + conf("spark.comet.exec.onHeap.enabled") + .doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.") + .booleanConf + .createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean) + val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool") .doc( "The type of memory pool to be used for Comet native execution. " + diff --git a/docs/source/contributor-guide/spark-sql-tests.md b/docs/source/contributor-guide/spark-sql-tests.md index 5666c8cebb..d19efe9347 100644 --- a/docs/source/contributor-guide/spark-sql-tests.md +++ b/docs/source/contributor-guide/spark-sql-tests.md @@ -57,13 +57,13 @@ git apply ../datafusion-comet/dev/diffs/3.4.3.diff ### Use the following commands to run the Spark SQL test suite locally. ```shell -ENABLE_COMET=true build/sbt catalyst/test -ENABLE_COMET=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest" -ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest" -ENABLE_COMET=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest" -ENABLE_COMET=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest" -ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest" -ENABLE_COMET=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt catalyst/test +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest" +ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest" ``` ### Steps to run individual test suites through SBT 1. Open SBT with Comet enabled diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 554d34e353..13efe7024e 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -50,6 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default | +| spark.comet.exec.onHeap.enabled | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | | spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 03aa8793b4..585d5a9d51 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -45,10 +45,19 @@ and requiring shuffle memory to be separately configured. The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to -Spark documentation: https://spark.apache.org/docs/latest/configuration.html. +[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning] +section of this guide. + +[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html ### Configuring Comet Memory in On-Heap Mode +```{warning} +Support for on-heap memory pools is deprecated and will be removed from a future release. +``` + +Comet is disabled by default in on-heap mode, but can be enabled by setting `spark.comet.exec.onHeap.enabled=true`. + When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting is not provided, it will be calculated by multiplying the current Spark executor memory by `spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for @@ -59,10 +68,13 @@ Comet supports native shuffle and columnar shuffle (these terms are explained in In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`. If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by `spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory -then the query will fail. +then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning] +section of this guide. [shuffle]: #shuffle +[Advanced Memory Tuning]: #advanced-memory-tuning + ### Determining How Much Memory to Allocate Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the @@ -102,14 +114,6 @@ Workarounds for this problem include: ## Advanced Memory Tuning -### Configuring spark.executor.memoryOverhead in On-Heap Mode - -In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so -that it is possible to allocate off-heap memory when running in on-heap mode. - -Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that -resource managers respect Apache Spark memory configuration before starting the containers. - ### Configuring Off-Heap Memory Pools Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. @@ -132,6 +136,10 @@ when there is sufficient memory in order to leave enough memory for other operat ### Configuring On-Heap Memory Pools +```{warning} +Support for on-heap memory pools is deprecated and will be removed from a future release. +``` + When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark. The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`. @@ -172,6 +180,14 @@ adjusting how much memory to allocate. [FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html [UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html +### Configuring spark.executor.memoryOverhead in On-Heap Mode + +In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so +that it is possible to allocate off-heap memory when running in on-heap mode. + +Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that +resource managers respect Apache Spark memory configuration before starting the containers. + ## Optimizing Joins Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 1f3d4bba84..863d5f87af 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -28,7 +28,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR} import org.apache.spark.sql.internal.StaticSQLConf -import org.apache.comet.CometSparkSessionExtensions +import org.apache.comet.{CometConf, CometSparkSessionExtensions} +import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE /** * Comet driver plugin. This class is loaded by Spark's plugin framework. It will be instantiated @@ -47,6 +48,15 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl override def init(sc: SparkContext, pluginContext: PluginContext): ju.Map[String, String] = { logInfo("CometDriverPlugin init") + if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) && + !sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) { + logWarning( + "Comet plugin is disabled because Spark is not running in off-heap mode. Set " + + s"${CometConf.COMET_ENABLE_ONHEAP_MODE.key}=true to override this restriction " + + "(for testing purposes only).") + return Collections.emptyMap[String, String] + } + // register CometSparkSessionExtensions if it isn't already registered CometDriverPlugin.registerCometSessionExtension(sc.conf) diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index 93dc10550c..99a37bb02a 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -31,6 +31,7 @@ class CometPluginsSuite extends CometTestBase { conf.set("spark.plugins", "org.apache.spark.CometPlugin") conf.set("spark.comet.enabled", "true") conf.set("spark.comet.exec.enabled", "true") + conf.set("spark.comet.exec.onHeap.enabled", "true") conf } @@ -99,6 +100,7 @@ class CometPluginsDefaultSuite extends CometTestBase { conf.set("spark.plugins", "org.apache.spark.CometPlugin") conf.set("spark.comet.enabled", "true") conf.set("spark.comet.exec.shuffle.enabled", "true") + conf.set("spark.comet.exec.onHeap.enabled", "true") conf } @@ -128,6 +130,7 @@ class CometPluginsNonOverrideSuite extends CometTestBase { conf.set("spark.comet.enabled", "true") conf.set("spark.comet.exec.shuffle.enabled", "false") conf.set("spark.comet.exec.enabled", "false") + conf.set("spark.comet.exec.onHeap.enabled", "true") conf } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index d4b7b029a3..844bd07f3b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -77,6 +77,7 @@ abstract class CometTestBase conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") conf.set(CometConf.COMET_ENABLED.key, "true") + conf.set(CometConf.COMET_ENABLE_ONHEAP_MODE.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")