Skip to content

Commit 19797f3

Browse files
committed
Merge branch 'main' into iceberg-rust
# Conflicts: # spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala
2 parents 40c9a07 + eeb1566 commit 19797f3

File tree

21 files changed

+91
-180
lines changed

21 files changed

+91
-180
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -279,41 +279,13 @@ object CometConf extends ShimCometConf {
279279
.booleanConf
280280
.createWithDefault(false)
281281

282-
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
282+
val COMET_ONHEAP_MEMORY_OVERHEAD: ConfigEntry[Long] = conf("spark.comet.memoryOverhead")
283283
.category(CATEGORY_TESTING)
284284
.doc(
285285
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +
286-
"when running Spark in on-heap mode. " +
287-
"This config is optional. If this is not specified, it will be set to " +
288-
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
289-
.internal()
286+
"when running Spark in on-heap mode.")
290287
.bytesConf(ByteUnit.MiB)
291-
.createOptional
292-
293-
val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
294-
conf("spark.comet.memory.overhead.factor")
295-
.category(CATEGORY_TESTING)
296-
.doc(
297-
"Fraction of executor memory to be allocated as additional memory for Comet " +
298-
"when running Spark in on-heap mode. " +
299-
s"$TUNING_GUIDE.")
300-
.internal()
301-
.doubleConf
302-
.checkValue(
303-
factor => factor > 0,
304-
"Ensure that Comet memory overhead factor is a double greater than 0")
305-
.createWithDefault(0.2)
306-
307-
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
308-
.category(CATEGORY_TESTING)
309-
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
310-
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
311-
.internal()
312-
.bytesConf(ByteUnit.MiB)
313-
.checkValue(
314-
_ >= 0,
315-
"Ensure that Comet memory overhead min is a long greater than or equal to 0")
316-
.createWithDefault(384)
288+
.createWithDefault(1024)
317289

318290
val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
319291
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
@@ -436,18 +408,8 @@ object CometConf extends ShimCometConf {
436408
.intConf
437409
.createWithDefault(Int.MaxValue)
438410

439-
val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
440-
conf("spark.comet.columnar.shuffle.memorySize")
441-
.internal()
442-
.category(CATEGORY_TESTING)
443-
.doc("Amount of memory to reserve for columnar shuffle when running in on-heap mode. " +
444-
s"$TUNING_GUIDE.")
445-
.bytesConf(ByteUnit.MiB)
446-
.createOptional
447-
448-
val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
411+
val COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
449412
conf("spark.comet.columnar.shuffle.memory.factor")
450-
.internal()
451413
.category(CATEGORY_TESTING)
452414
.doc("Fraction of Comet memory to be allocated per executor process for columnar shuffle " +
453415
s"when running in on-heap mode. $TUNING_GUIDE.")
@@ -534,7 +496,6 @@ object CometConf extends ShimCometConf {
534496
.category(CATEGORY_EXEC_EXPLAIN)
535497
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
536498
"in physical optimizer rules. Default: false")
537-
.internal()
538499
.booleanConf
539500
.createWithDefault(false)
540501

@@ -569,15 +530,14 @@ object CometConf extends ShimCometConf {
569530
.booleanConf
570531
.createWithDefault(false)
571532

572-
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
533+
val COMET_ONHEAP_ENABLED: ConfigEntry[Boolean] =
573534
conf("spark.comet.exec.onHeap.enabled")
574535
.category(CATEGORY_TESTING)
575536
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
576-
.internal()
577537
.booleanConf
578538
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
579539

580-
val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
540+
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
581541
conf("spark.comet.exec.memoryPool")
582542
.category(CATEGORY_TUNING)
583543
.doc(
@@ -587,19 +547,18 @@ object CometConf extends ShimCometConf {
587547
.stringConf
588548
.createWithDefault("fair_unified")
589549

590-
val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
550+
val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
591551
"spark.comet.exec.onHeap.memoryPool")
592-
.category(CATEGORY_TUNING)
552+
.category(CATEGORY_TESTING)
593553
.doc(
594554
"The type of memory pool to be used for Comet native execution " +
595555
"when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, " +
596556
"`greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, " +
597557
"and `unbounded`.")
598-
.internal()
599558
.stringConf
600559
.createWithDefault("greedy_task_shared")
601560

602-
val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
561+
val COMET_OFFHEAP_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
603562
conf("spark.comet.exec.memoryPool.fraction")
604563
.category(CATEGORY_TUNING)
605564
.doc(

docs/source/user-guide/latest/configs.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ These settings can be used to determine which parts of the plan are accelerated
8484
| Config | Description | Default Value |
8585
|--------|-------------|---------------|
8686
| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
87+
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
8788
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
8889
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
8990
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
@@ -120,6 +121,18 @@ These settings can be used to determine which parts of the plan are accelerated
120121
| `spark.comet.tracing.enabled` | Enable fine-grained tracing of events and memory usage. For more information, refer to the [Comet Tracing Guide](https://datafusion.apache.org/comet/user-guide/tracing.html). | false |
121122
<!--END:CONFIG_TABLE-->
122123

124+
## Development & Testing Settings
125+
126+
<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
127+
<!--BEGIN:CONFIG_TABLE[testing]-->
128+
| Config | Description | Default Value |
129+
|--------|-------------|---------------|
130+
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
131+
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
132+
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
133+
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
134+
<!--END:CONFIG_TABLE-->
135+
123136
## Enabling or Disabling Individual Operators
124137

125138
<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion_comet_proto::spark_operator::Operator;
4343
use datafusion_spark::function::bitwise::bit_get::SparkBitGet;
4444
use datafusion_spark::function::datetime::date_add::SparkDateAdd;
4545
use datafusion_spark::function::datetime::date_sub::SparkDateSub;
46+
use datafusion_spark::function::hash::sha1::SparkSha1;
4647
use datafusion_spark::function::hash::sha2::SparkSha2;
4748
use datafusion_spark::function::math::expm1::SparkExpm1;
4849
use datafusion_spark::function::string::char::CharFunc;
@@ -332,6 +333,7 @@ fn prepare_datafusion_session_context(
332333
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default()));
333334
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default()));
334335
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default()));
336+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default()));
335337

336338
// Must be the last one to override existing functions with the same name
337339
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,10 @@ object CometExecIterator extends Logging {
273273
if (offHeapMode) {
274274
// in off-heap mode, Comet uses unified memory management to share off-heap memory with Spark
275275
val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
276-
val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
276+
val memoryFraction = CometConf.COMET_OFFHEAP_MEMORY_POOL_FRACTION.get()
277277
val memoryLimit = (offHeapSize * memoryFraction).toLong
278278
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
279-
val memoryPoolType = COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE.get()
279+
val memoryPoolType = COMET_OFFHEAP_MEMORY_POOL_TYPE.get()
280280
logInfo(
281281
s"memoryPoolType=$memoryPoolType, " +
282282
s"offHeapSize=${toMB(offHeapSize)}, " +
@@ -291,7 +291,7 @@ object CometExecIterator extends Logging {
291291
// example 16GB maxMemory * 16 cores with 4 cores per task results
292292
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
293293
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
294-
val memoryPoolType = COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE.get()
294+
val memoryPoolType = COMET_ONHEAP_MEMORY_POOL_TYPE.get()
295295
logInfo(
296296
s"memoryPoolType=$memoryPoolType, " +
297297
s"memoryLimit=${toMB(memoryLimit)}, " +

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -243,46 +243,25 @@ object CometSparkSessionExtensions extends Logging {
243243
}
244244

245245
/**
246-
* Calculates required memory overhead in MB per executor process for Comet when running in
246+
* Determines required memory overhead in MB per executor process for Comet when running in
247247
* on-heap mode.
248-
*
249-
* If `COMET_MEMORY_OVERHEAD` is defined then that value will be used, otherwise the overhead
250-
* will be calculated by multiplying executor memory (`spark.executor.memory`) by
251-
* `COMET_MEMORY_OVERHEAD_FACTOR`.
252-
*
253-
* In either case, a minimum value of `COMET_MEMORY_OVERHEAD_MIN_MIB` will be returned.
254248
*/
255249
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
256250
if (isOffHeapEnabled(sparkConf)) {
257251
// when running in off-heap mode we use unified memory management to share
258252
// off-heap memory with Spark so do not add overhead
259253
return 0
260254
}
261-
262-
// `spark.executor.memory` default value is 1g
263-
val baseMemoryMiB = ConfigHelpers
264-
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)
265-
266-
val cometMemoryOverheadMinAsString = sparkConf.get(
267-
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
268-
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString)
269-
270-
val minimum = ConfigHelpers.byteFromString(cometMemoryOverheadMinAsString, ByteUnit.MiB)
271-
val overheadFactor = getDoubleConf(sparkConf, COMET_MEMORY_OVERHEAD_FACTOR)
272-
273-
val overHeadMemFromConf = sparkConf
274-
.getOption(COMET_MEMORY_OVERHEAD.key)
275-
.map(ConfigHelpers.byteFromString(_, ByteUnit.MiB))
276-
277-
overHeadMemFromConf.getOrElse(math.max((overheadFactor * baseMemoryMiB).toLong, minimum))
255+
ConfigHelpers.byteFromString(
256+
sparkConf.get(
257+
COMET_ONHEAP_MEMORY_OVERHEAD.key,
258+
COMET_ONHEAP_MEMORY_OVERHEAD.defaultValueString),
259+
ByteUnit.MiB)
278260
}
279261

280262
private def getBooleanConf(conf: SparkConf, entry: ConfigEntry[Boolean]) =
281263
conf.getBoolean(entry.key, entry.defaultValue.get)
282264

283-
private def getDoubleConf(conf: SparkConf, entry: ConfigEntry[Double]) =
284-
conf.getDouble(entry.key, entry.defaultValue.get)
285-
286265
/**
287266
* Calculates required memory overhead in bytes per executor process for Comet when running in
288267
* on-heap mode.
@@ -300,11 +279,9 @@ object CometSparkSessionExtensions extends Logging {
300279

301280
val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)
302281

303-
val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
304-
val cometShuffleMemoryFromConf = COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.get(conf)
282+
val overheadFactor = COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR.get(conf)
305283

306-
val shuffleMemorySize =
307-
cometShuffleMemoryFromConf.getOrElse((overheadFactor * cometMemoryOverhead).toLong)
284+
val shuffleMemorySize = (overheadFactor * cometMemoryOverhead).toLong
308285
if (shuffleMemorySize > cometMemoryOverhead) {
309286
logWarning(
310287
s"Configured shuffle memory size $shuffleMemorySize is larger than Comet memory overhead " +

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
2525

2626
import org.apache.spark.sql.catalyst.expressions.Cast
2727

28+
import org.apache.comet.CometConf.COMET_ONHEAP_MEMORY_OVERHEAD
2829
import org.apache.comet.expressions.{CometCast, CometEvalMode}
2930
import org.apache.comet.serde.{Compatible, Incompatible, QueryPlanSerde}
3031

@@ -78,7 +79,13 @@ object GenerateDocs {
7879
if (conf.defaultValue.isEmpty) {
7980
w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
8081
} else {
81-
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
82+
val isBytesConf = conf.key == COMET_ONHEAP_MEMORY_OVERHEAD.key
83+
if (isBytesConf) {
84+
val bytes = conf.defaultValue.get.asInstanceOf[Long]
85+
w.write(s"| `${conf.key}` | $doc | $bytes MiB |\n".getBytes)
86+
} else {
87+
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
88+
}
8289
}
8390
}
8491
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
157157
classOf[Md5] -> CometScalarFunction("md5"),
158158
classOf[Murmur3Hash] -> CometMurmur3Hash,
159159
classOf[Sha2] -> CometSha2,
160-
classOf[XxHash64] -> CometXxHash64)
160+
classOf[XxHash64] -> CometXxHash64,
161+
classOf[Sha1] -> CometSha1)
161162

162163
private val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
163164
classOf[Ascii] -> CometScalarFunction("ascii"),

spark/src/main/scala/org/apache/comet/serde/hash.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha2, XxHash64}
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha1, Sha2, XxHash64}
2323
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, StringType}
2424

2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -89,6 +89,20 @@ object CometSha2 extends CometExpressionSerde[Sha2] {
8989
}
9090
}
9191

92+
object CometSha1 extends CometExpressionSerde[Sha1] {
93+
override def convert(
94+
expr: Sha1,
95+
inputs: Seq[Attribute],
96+
binding: Boolean): Option[ExprOuterClass.Expr] = {
97+
if (!HashUtils.isSupportedType(expr)) {
98+
withInfo(expr, s"HashUtils doesn't support dataType: ${expr.child.dataType}")
99+
return None
100+
}
101+
val childExpr = exprToProtoInternal(expr.child, inputs, binding)
102+
scalarFunctionExprToProtoWithReturnType("sha1", StringType, false, childExpr)
103+
}
104+
}
105+
92106
private object HashUtils {
93107
def isSupportedType(expr: Expression): Boolean = {
94108
for (child <- expr.children) {

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ object FuzzDataGenerator {
117117
// generate schema using random data types
118118
val fields = dataTypes.zipWithIndex
119119
.map(i => StructField(s"c${i._2}", i._1, nullable = true))
120-
val schema = StructType(fields)
120+
val schema = StructType(fields.toSeq)
121121

122122
// generate columnar data
123123
val cols: Seq[Seq[Any]] =
@@ -147,7 +147,7 @@ object FuzzDataGenerator {
147147
list += Range(0, r.nextInt(5)).map(j => values((i + j) % values.length)).toArray
148148
}
149149
}
150-
list
150+
list.toSeq
151151
case StructType(fields) =>
152152
val values = fields.map(f => generateColumn(r, f.dataType, numRows, options))
153153
Range(0, numRows).map(i => Row(values.indices.map(j => values(j)(i)): _*))

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
2929
import org.apache.spark.sql.internal.StaticSQLConf
3030

31-
import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE
31+
import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
3232
import org.apache.comet.CometSparkSessionExtensions
3333

3434
/**
@@ -49,7 +49,7 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
4949
logInfo("CometDriverPlugin init")
5050

5151
if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) &&
52-
!sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) {
52+
!sc.getConf.getBoolean(COMET_ONHEAP_ENABLED.key, false)) {
5353
logWarning("Comet plugin is disabled because Spark is not running in off-heap mode.")
5454
return Collections.emptyMap[String, String]
5555
}

0 commit comments

Comments
 (0)