Skip to content

Commit 3ed034d

Browse files
committed
Merge remote-tracking branch 'origin/config-env-var' into test-spark-sql-coverage
2 parents 532d734 + 21fba63 commit 3ed034d

File tree

5 files changed

+76
-29
lines changed

5 files changed

+76
-29
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,9 @@ object CometConf extends ShimCometConf {
8484
.doc(
8585
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
8686
"Comet to read Parquet data source. Note that to enable native vectorized execution, " +
87-
"both this config and `spark.comet.exec.enabled` need to be enabled. By default, this " +
88-
"config is the value of the env var `ENABLE_COMET` if set, or true otherwise.")
87+
"both this config and `spark.comet.exec.enabled` need to be enabled.")
8988
.booleanConf
90-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean)
89+
.createWithEnvVarOrDefault("ENABLE_COMET", true)
9190

9291
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
9392
.category(CATEGORY_SCAN)
@@ -119,9 +118,7 @@ object CometConf extends ShimCometConf {
119118
.transform(_.toLowerCase(Locale.ROOT))
120119
.checkValues(
121120
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
122-
.createWithDefault(sys.env
123-
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
124-
.toLowerCase(Locale.ROOT))
121+
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
125122

126123
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
127124
conf("spark.comet.parquet.respectFilterPushdown")
@@ -493,8 +490,7 @@ object CometConf extends ShimCometConf {
493490
.category(CATEGORY_EXEC_EXPLAIN)
494491
.doc("When this setting is enabled, Comet will log warnings for all fallback reasons.")
495492
.booleanConf
496-
.createWithDefault(
497-
sys.env.getOrElse("ENABLE_COMET_LOG_FALLBACK_REASONS", "false").toBoolean)
493+
.createWithEnvVarOrDefault("ENABLE_COMET_LOG_FALLBACK_REASONS", false)
498494

499495
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
500496
conf("spark.comet.explainFallback.enabled")
@@ -524,7 +520,7 @@ object CometConf extends ShimCometConf {
524520
.category(CATEGORY_TESTING)
525521
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
526522
.booleanConf
527-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
523+
.createWithEnvVarOrDefault("ENABLE_COMET_ONHEAP", false)
528524

529525
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
530526
conf("spark.comet.exec.memoryPool")
@@ -707,7 +703,7 @@ object CometConf extends ShimCometConf {
707703
.doc("Experimental option to enable strict testing, which will fail tests that could be " +
708704
"more comprehensive, such as checking for a specific fallback reason")
709705
.booleanConf
710-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_STRICT_TESTING", "false").toBoolean)
706+
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
711707

712708
/** Create a config to enable a specific operator */
713709
private def createExecEnabledConfig(
@@ -865,6 +861,37 @@ private class TypedConfigBuilder[T](
865861
CometConf.register(conf)
866862
conf
867863
}
864+
865+
/**
866+
* Creates a [[ConfigEntry]] that has a default value, with support for environment variable
867+
* override.
868+
*
869+
* The value is resolved in the following priority order:
870+
* 1. Spark config value (if set) 2. Environment variable value (if set) 3. Default value
871+
*
872+
* @param envVar
873+
* The environment variable name to check for override value
874+
* @param default
875+
* The default value to use if neither config nor env var is set
876+
* @return
877+
* A ConfigEntry with environment variable support
878+
*/
879+
def createWithEnvVarOrDefault(envVar: String, default: T): ConfigEntry[T] = {
880+
val defaultValue = sys.env.get(envVar).map(converter).getOrElse(default)
881+
val transformedDefault = converter(stringConverter(defaultValue))
882+
val conf = new ConfigEntryWithDefault[T](
883+
parent.key,
884+
transformedDefault,
885+
converter,
886+
stringConverter,
887+
parent._doc,
888+
parent._category,
889+
parent._public,
890+
parent._version,
891+
Some(envVar))
892+
CometConf.register(conf)
893+
conf
894+
}
868895
}
869896

870897
private[comet] abstract class ConfigEntry[T](
@@ -892,6 +919,11 @@ private[comet] abstract class ConfigEntry[T](
892919

893920
def defaultValueString: String
894921

922+
/**
923+
* The environment variable name that can override this config's default value, if applicable.
924+
*/
925+
def envVar: Option[String] = None
926+
895927
override def toString: String = {
896928
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
897929
s"public=$isPublic, version=$version)"
@@ -906,12 +938,15 @@ private[comet] class ConfigEntryWithDefault[T](
906938
doc: String,
907939
category: String,
908940
isPublic: Boolean,
909-
version: String)
941+
version: String,
942+
_envVar: Option[String] = None)
910943
extends ConfigEntry(key, valueConverter, stringConverter, doc, category, isPublic, version) {
911944
override def defaultValue: Option[T] = Some(_defaultValue)
912945

913946
override def defaultValueString: String = stringConverter(_defaultValue)
914947

948+
override def envVar: Option[String] = _envVar
949+
915950
def get(conf: SQLConf): T = {
916951
val tmp = conf.getConfString(key, null)
917952
if (tmp == null) {

docs/source/user-guide/latest/configs.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
6363
| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
6464
| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
6565
| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true |
66-
| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
66+
| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. Can be overridden by environment variable `ENABLE_COMET`. | true |
6767
| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
6868
| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec.<operator_name>.enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true |
6969
| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
@@ -86,7 +86,7 @@ These settings can be used to determine which parts of the plan are accelerated
8686
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
8787
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
8888
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
89-
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
89+
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. Can be overridden by environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false |
9090
<!--END:CONFIG_TABLE-->
9191

9292
## Shuffle Configuration Settings
@@ -127,10 +127,10 @@ These settings can be used to determine which parts of the plan are accelerated
127127
| Config | Description | Default Value |
128128
|--------|-------------|---------------|
129129
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
130-
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
130+
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. Can be overridden by environment variable `ENABLE_COMET_ONHEAP`. | false |
131131
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
132132
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
133-
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason | false |
133+
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason Can be overridden by environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
134134
<!--END:CONFIG_TABLE-->
135135

136136
## Enabling or Disabling Individual Operators

fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments)
3131
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
3232
val inputCometFolder: ScallopOption[String] =
3333
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
34+
val tolerance: ScallopOption[Double] =
35+
opt[Double](default = Some(0.000002), descr = "Tolerance for floating point comparisons")
3436
}
3537
addSubcommand(compareParquet)
3638
verify()
@@ -49,7 +51,8 @@ object ComparisonTool {
4951
compareParquetFolders(
5052
spark,
5153
conf.compareParquet.inputSparkFolder(),
52-
conf.compareParquet.inputCometFolder())
54+
conf.compareParquet.inputCometFolder(),
55+
conf.compareParquet.tolerance())
5356

5457
case _ =>
5558
// scalastyle:off println
@@ -62,7 +65,8 @@ object ComparisonTool {
6265
private def compareParquetFolders(
6366
spark: SparkSession,
6467
sparkFolderPath: String,
65-
cometFolderPath: String): Unit = {
68+
cometFolderPath: String,
69+
tolerance: Double): Unit = {
6670

6771
val output = QueryRunner.createOutputMdFile()
6872

@@ -115,7 +119,7 @@ object ComparisonTool {
115119
val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()
116120

117121
// Compare the results
118-
if (QueryComparison.assertSameRows(sparkRows, cometRows, output)) {
122+
if (QueryComparison.assertSameRows(sparkRows, cometRows, output, tolerance)) {
119123
output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
120124
} else {
121125
// Output schema if dataframes are not equal

fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ object QueryComparison {
148148
def assertSameRows(
149149
sparkRows: Array[Row],
150150
cometRows: Array[Row],
151-
output: BufferedWriter): Boolean = {
151+
output: BufferedWriter,
152+
tolerance: Double = 0.000001): Boolean = {
152153
if (sparkRows.length == cometRows.length) {
153154
var i = 0
154155
while (i < sparkRows.length) {
@@ -164,7 +165,7 @@ object QueryComparison {
164165

165166
assert(l.length == r.length)
166167
for (j <- 0 until l.length) {
167-
if (!same(l(j), r(j))) {
168+
if (!same(l(j), r(j), tolerance)) {
168169
output.write(s"First difference at row $i:\n")
169170
output.write("Spark: `" + formatRow(l) + "`\n")
170171
output.write("Comet: `" + formatRow(r) + "`\n")
@@ -186,7 +187,7 @@ object QueryComparison {
186187
true
187188
}
188189

189-
private def same(l: Any, r: Any): Boolean = {
190+
private def same(l: Any, r: Any, tolerance: Double): Boolean = {
190191
if (l == null || r == null) {
191192
return l == null && r == null
192193
}
@@ -195,20 +196,20 @@ object QueryComparison {
195196
case (a: Float, b: Float) if a.isNegInfinity => b.isNegInfinity
196197
case (a: Float, b: Float) if a.isInfinity => b.isInfinity
197198
case (a: Float, b: Float) if a.isNaN => b.isNaN
198-
case (a: Float, b: Float) => (a - b).abs <= 0.000001f
199+
case (a: Float, b: Float) => (a - b).abs <= tolerance
199200
case (a: Double, b: Double) if a.isPosInfinity => b.isPosInfinity
200201
case (a: Double, b: Double) if a.isNegInfinity => b.isNegInfinity
201202
case (a: Double, b: Double) if a.isInfinity => b.isInfinity
202203
case (a: Double, b: Double) if a.isNaN => b.isNaN
203-
case (a: Double, b: Double) => (a - b).abs <= 0.000001
204+
case (a: Double, b: Double) => (a - b).abs <= tolerance
204205
case (a: Array[_], b: Array[_]) =>
205-
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2))
206+
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
206207
case (a: mutable.WrappedArray[_], b: mutable.WrappedArray[_]) =>
207-
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2))
208+
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
208209
case (a: Row, b: Row) =>
209210
val aa = a.toSeq
210211
val bb = b.toSeq
211-
aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2))
212+
aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2, tolerance))
212213
case (a, b) => a == b
213214
}
214215
}

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,22 @@ object GenerateDocs {
7474
// convert links to Markdown
7575
val doc =
7676
urlPattern.replaceAllIn(conf.doc.trim, m => s"[Comet ${m.group(1)} Guide](")
77+
// append env var info if present
78+
val docWithEnvVar = conf.envVar match {
79+
case Some(envVarName) =>
80+
s"$doc Can be overridden by environment variable `$envVarName`."
81+
case None => doc
82+
}
7783
if (conf.defaultValue.isEmpty) {
78-
w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
84+
w.write(s"| `${conf.key}` | $docWithEnvVar | |\n".getBytes)
7985
} else {
8086
val isBytesConf = conf.key == COMET_ONHEAP_MEMORY_OVERHEAD.key
8187
if (isBytesConf) {
8288
val bytes = conf.defaultValue.get.asInstanceOf[Long]
83-
w.write(s"| `${conf.key}` | $doc | $bytes MiB |\n".getBytes)
89+
w.write(s"| `${conf.key}` | $docWithEnvVar | $bytes MiB |\n".getBytes)
8490
} else {
85-
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
91+
val defaultVal = conf.defaultValueString
92+
w.write(s"| `${conf.key}` | $docWithEnvVar | $defaultVal |\n".getBytes)
8693
}
8794
}
8895
}

0 commit comments

Comments
 (0)