Skip to content

Commit c9f1b77

Browse files
authored
Merge branch 'apache:main' into dev
2 parents e12c548 + 4f8ce75 commit c9f1b77

File tree

177 files changed

+9296
-4769
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

177 files changed

+9296
-4769
lines changed

.github/actions/setup-spark-builder/action.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ inputs:
2929
comet-version:
3030
description: 'The Comet version to use for Spark'
3131
required: true
32-
default: '0.4.0-SNAPSHOT'
32+
default: '0.5.0-SNAPSHOT'
3333
runs:
3434
using: "composite"
3535
steps:

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ jobs:
7171
with:
7272
spark-version: ${{ matrix.spark-version.full }}
7373
spark-short-version: ${{ matrix.spark-version.short }}
74-
comet-version: '0.4.0-SNAPSHOT' # TODO: get this from pom.xml
74+
comet-version: '0.5.0-SNAPSHOT' # TODO: get this from pom.xml
7575
- name: Run Spark tests
7676
run: |
7777
cd apache-spark

.github/workflows/spark_sql_test_ansi.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969
with:
7070
spark-version: ${{ matrix.spark-version.full }}
7171
spark-short-version: ${{ matrix.spark-version.short }}
72-
comet-version: '0.4.0-SNAPSHOT' # TODO: get this from pom.xml
72+
comet-version: '0.5.0-SNAPSHOT' # TODO: get this from pom.xml
7373
- name: Run Spark tests
7474
run: |
7575
cd apache-spark

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ The following chart shows the time it takes to run the 22 TPC-H queries against
4646
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
4747
for details of the environment used for these benchmarks.
4848

49-
When using Comet, the overall run time is reduced from 616 seconds to 374 seconds, a 1.6x speedup, with query 1
49+
When using Comet, the overall run time is reduced from 615 seconds to 364 seconds, a 1.7x speedup, with query 1
5050
running 9x faster than Spark.
5151

5252
Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.6x
@@ -55,21 +55,21 @@ speedup compared to Spark.
5555
Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
5656
for a broader set of queries.
5757

58-
![](docs/source/_static/images/benchmark-results/0.3.0/tpch_allqueries.png)
58+
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_allqueries.png)
5959

6060
Here is a breakdown showing relative performance of Spark, Comet, and DataFusion for each TPC-H query.
6161

62-
![](docs/source/_static/images/benchmark-results/0.3.0/tpch_queries_compare.png)
62+
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_compare.png)
6363

6464
The following charts shows how much Comet currently accelerates each query from the benchmark.
6565

6666
### Relative speedup
6767

68-
![](docs/source/_static/images/benchmark-results/0.3.0/tpch_queries_speedup_rel.png)
68+
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_rel.png)
6969

7070
### Absolute speedup
7171

72-
![](docs/source/_static/images/benchmark-results/0.3.0/tpch_queries_speedup_abs.png)
72+
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_abs.png)
7373

7474
These benchmarks can be reproduced in any environment using the documentation in the
7575
[Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html). We encourage
@@ -80,7 +80,7 @@ Results for our benchmark derived from TPC-DS are available in the [benchmarking
8080
## Use Commodity Hardware
8181

8282
Comet leverages commodity hardware, eliminating the need for costly hardware upgrades or
83-
specialized hardware accelerators, such as GPUs or FGPA. By maximizing the utilization of commodity hardware, Comet
83+
specialized hardware accelerators, such as GPUs or FPGA. By maximizing the utilization of commodity hardware, Comet
8484
ensures cost-effectiveness and scalability for your Spark deployments.
8585

8686
## Spark Compatibility

benchmarks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ docker push localhost:32000/apache/datafusion-comet-tpcbench:latest
6262
export SPARK_MASTER=k8s://https://127.0.0.1:16443
6363
export COMET_DOCKER_IMAGE=localhost:32000/apache/datafusion-comet-tpcbench:latest
6464
# Location of Comet JAR within the Docker image
65-
export COMET_JAR=/opt/spark/jars/comet-spark-spark3.4_2.12-0.2.0-SNAPSHOT.jar
65+
export COMET_JAR=/opt/spark/jars/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar
6666

6767
$SPARK_HOME/bin/spark-submit \
6868
--master $SPARK_MASTER \

common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ under the License.
2626
<parent>
2727
<groupId>org.apache.datafusion</groupId>
2828
<artifactId>comet-parent-spark${spark.version.short}_${scala.binary.version}</artifactId>
29-
<version>0.4.0-SNAPSHOT</version>
29+
<version>0.5.0-SNAPSHOT</version>
3030
<relativePath>../pom.xml</relativePath>
3131
</parent>
3232

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object CometConf extends ShimCometConf {
7373
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
7474
"read supported data sources (currently only Parquet is supported natively). Note " +
7575
"that to enable native vectorized execution, both this config and " +
76-
"'spark.comet.exec.enabled' need to be enabled. By default, this config is true.")
76+
"'spark.comet.exec.enabled' need to be enabled.")
7777
.booleanConf
7878
.createWithDefault(true)
7979

@@ -82,7 +82,7 @@ object CometConf extends ShimCometConf {
8282
.doc(
8383
"Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads " +
8484
"ranges of consecutive data in a file in parallel. It is faster for large files and " +
85-
"row groups but uses more resources. The parallel reader is enabled by default.")
85+
"row groups but uses more resources.")
8686
.booleanConf
8787
.createWithDefault(true)
8888

@@ -98,7 +98,7 @@ object CometConf extends ShimCometConf {
9898
.doc(
9999
"When enabled the parallel reader will try to merge ranges of data that are separated " +
100100
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads " +
101-
"are faster on cloud storage. The default behavior is to merge consecutive ranges.")
101+
"are faster on cloud storage.")
102102
.booleanConf
103103
.createWithDefault(true)
104104

@@ -115,7 +115,7 @@ object CometConf extends ShimCometConf {
115115
.doc("In the parallel reader, if the read ranges submitted are skewed in sizes, this " +
116116
"option will cause the reader to break up larger read ranges into smaller ranges to " +
117117
"reduce the skew. This will result in a slightly larger number of connections opened to " +
118-
"the file system but may give improved performance. The option is off by default.")
118+
"the file system but may give improved performance.")
119119
.booleanConf
120120
.createWithDefault(false)
121121

@@ -153,7 +153,7 @@ object CometConf extends ShimCometConf {
153153
"native space. Note: each operator is associated with a separate config in the " +
154154
"format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the " +
155155
"config and this need to be turned on, in order for the operator to be executed in " +
156-
"native. By default, this config is true.")
156+
"native.")
157157
.booleanConf
158158
.createWithDefault(true)
159159

@@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
215215
"spark.comet.memory.overhead.factor")
216216
.doc(
217217
"Fraction of executor memory to be allocated as additional non-heap memory per executor " +
218-
"process for Comet. Default value is 0.2.")
218+
"process for Comet.")
219219
.doubleConf
220220
.checkValue(
221221
factor => factor > 0,
@@ -247,8 +247,7 @@ object CometConf extends ShimCometConf {
247247
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
248248
"'native' is for native shuffle which has best performance in general. " +
249249
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
250-
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
251-
"By default, this config is 'auto'.")
250+
"'auto' is for Comet to choose the best shuffle mode based on the query plan.")
252251
.internal()
253252
.stringConf
254253
.transform(_.toLowerCase(Locale.ROOT))
@@ -258,8 +257,8 @@ object CometConf extends ShimCometConf {
258257
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
259258
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
260259
.doc(
261-
"Whether to force enabling broadcasting for Comet native operators. By default, " +
262-
"this config is false. Comet broadcast feature will be enabled automatically by " +
260+
"Whether to force enabling broadcasting for Comet native operators. " +
261+
"Comet broadcast feature will be enabled automatically by " +
263262
"Comet extension. But for unit tests, we need this feature to force enabling it " +
264263
"for invalid cases. So this config is only used for unit test.")
265264
.internal()
@@ -273,34 +272,41 @@ object CometConf extends ShimCometConf {
273272
.booleanConf
274273
.createWithDefault(false)
275274

276-
val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
277-
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
275+
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
276+
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
278277
.doc(
279-
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
278+
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
279+
"Compression can be disabled by setting spark.shuffle.compress=false.")
280280
.stringConf
281+
.checkValues(Set("zstd"))
281282
.createWithDefault("zstd")
282283

283-
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
284-
"spark.comet.columnar.shuffle.async.enabled")
285-
.doc(
286-
"Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config " +
287-
"is false.")
288-
.booleanConf
289-
.createWithDefault(false)
284+
val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
285+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
286+
.doc("The compression level to use when compression shuffle files.")
287+
.intConf
288+
.createWithDefault(1)
289+
290+
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
291+
conf("spark.comet.columnar.shuffle.async.enabled")
292+
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
293+
.booleanConf
294+
.createWithDefault(false)
290295

291296
val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
292297
conf("spark.comet.columnar.shuffle.async.thread.num")
293-
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
294-
"By default, this config is 3. Note that more threads means more memory requirement to " +
295-
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
296-
"improve performance, and should be set based on the number of cores available.")
298+
.doc(
299+
"Number of threads used for Comet async columnar shuffle per shuffle task. " +
300+
"Note that more threads means more memory requirement to " +
301+
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
302+
"improve performance, and should be set based on the number of cores available.")
297303
.intConf
298304
.createWithDefault(3)
299305

300306
val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
301307
conf("spark.comet.columnar.shuffle.async.max.thread.num")
302308
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
303-
"By default, this config is 100. This is the upper bound of total number of shuffle " +
309+
"This is the upper bound of total number of shuffle " +
304310
"threads per executor. In other words, if the number of cores * the number of shuffle " +
305311
"threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than " +
306312
"this config. Comet will use this config as the number of shuffle threads per " +
@@ -317,16 +323,17 @@ object CometConf extends ShimCometConf {
317323
"Higher value means more memory requirement to buffer shuffle data before " +
318324
"flushing to disk. As Comet uses columnar shuffle which is columnar format, " +
319325
"higher value usually helps to improve shuffle data compression ratio. This is " +
320-
"internal config for testing purpose or advanced tuning. By default, " +
321-
"this config is Int.Max.")
326+
"internal config for testing purpose or advanced tuning.")
322327
.internal()
323328
.intConf
324329
.createWithDefault(Int.MaxValue)
325330

326331
val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
327332
conf("spark.comet.columnar.shuffle.memorySize")
333+
.internal()
328334
.doc(
329-
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
335+
"Test-only config. This is only used to test Comet shuffle with Spark tests. " +
336+
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
330337
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is " +
331338
"`jvm`. Once allocated memory size reaches this config, the current batch will be " +
332339
"flushed to disk immediately. If this is not configured, Comet will use " +
@@ -338,17 +345,29 @@ object CometConf extends ShimCometConf {
338345

339346
val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
340347
conf("spark.comet.columnar.shuffle.memory.factor")
348+
.internal()
341349
.doc(
342-
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
350+
"Test-only config. This is only used to test Comet shuffle with Spark tests. " +
351+
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
343352
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
344-
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
345-
"By default, this config is 1.0.")
353+
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`.")
346354
.doubleConf
347355
.checkValue(
348356
factor => factor > 0,
349357
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
350358
.createWithDefault(1.0)
351359

360+
val COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST: ConfigEntry[Boolean] =
361+
conf("spark.comet.columnar.shuffle.unifiedMemoryAllocatorTest")
362+
.doc("Whether to use Spark unified memory allocator for Comet columnar shuffle in tests." +
363+
"If not configured, Comet will use a test-only memory allocator for Comet columnar " +
364+
"shuffle when Spark test env detected. The test-ony allocator is proposed to run with " +
365+
"Spark tests as these tests require on-heap memory configuration. " +
366+
"By default, this config is false.")
367+
.internal()
368+
.booleanConf
369+
.createWithDefault(false)
370+
352371
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
353372
conf("spark.comet.columnar.shuffle.batch.size")
354373
.internal()
@@ -360,11 +379,12 @@ object CometConf extends ShimCometConf {
360379

361380
val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
362381
"spark.comet.shuffle.preferDictionary.ratio")
363-
.doc("The ratio of total values to distinct values in a string column to decide whether to " +
364-
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
365-
"this config, dictionary encoding will be used on shuffling string column. This config " +
366-
"is effective if it is higher than 1.0. By default, this config is 10.0. Note that this " +
367-
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
382+
.doc(
383+
"The ratio of total values to distinct values in a string column to decide whether to " +
384+
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
385+
"this config, dictionary encoding will be used on shuffling string column. This config " +
386+
"is effective if it is higher than 1.0. Note that this " +
387+
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
368388
.doubleConf
369389
.createWithDefault(10.0)
370390

@@ -377,7 +397,7 @@ object CometConf extends ShimCometConf {
377397
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
378398
conf("spark.comet.debug.enabled")
379399
.doc(
380-
"Whether to enable debug mode for Comet. By default, this config is false. " +
400+
"Whether to enable debug mode for Comet. " +
381401
"When enabled, Comet will do additional checks for debugging purpose. For example, " +
382402
"validating array when importing arrays from JVM at native side. Note that these " +
383403
"checks may be expensive in performance and should only be enabled for debugging " +
@@ -437,27 +457,27 @@ object CometConf extends ShimCometConf {
437457
"The fraction of memory from Comet memory overhead that the native memory " +
438458
"manager can use for execution. The purpose of this config is to set aside memory for " +
439459
"untracked data structures, as well as imprecise size estimation during memory " +
440-
"acquisition. Default value is 0.7.")
460+
"acquisition.")
441461
.doubleConf
442462
.createWithDefault(0.7)
443463

444-
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf(
445-
"spark.comet.parquet.enable.directBuffer")
446-
.doc("Whether to use Java direct byte buffer when reading Parquet. By default, this is false")
447-
.booleanConf
448-
.createWithDefault(false)
464+
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
465+
conf("spark.comet.parquet.enable.directBuffer")
466+
.doc("Whether to use Java direct byte buffer when reading Parquet.")
467+
.booleanConf
468+
.createWithDefault(false)
449469

450470
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
451471
conf("spark.comet.scan.preFetch.enabled")
452-
.doc("Whether to enable pre-fetching feature of CometScan. By default is disabled.")
472+
.doc("Whether to enable pre-fetching feature of CometScan.")
453473
.booleanConf
454474
.createWithDefault(false)
455475

456476
val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
457477
conf("spark.comet.scan.preFetch.threadNum")
458478
.doc(
459479
"The number of threads running pre-fetching for CometScan. Effective if " +
460-
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. By default it is 2. Note that more " +
480+
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
461481
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
462482
.intConf
463483
.createWithDefault(2)

0 commit comments

Comments
 (0)