Skip to content

Commit eaee24c

Browse files
authored
chore: Improve framework for specifying that configs can be set with env vars (#2722)
1 parent f56006a commit eaee24c

File tree

3 files changed

+60
-19
lines changed

3 files changed

+60
-19
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,9 @@ object CometConf extends ShimCometConf {
8484
.doc(
8585
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
8686
"Comet to read Parquet data source. Note that to enable native vectorized execution, " +
87-
"both this config and `spark.comet.exec.enabled` need to be enabled. By default, this " +
88-
"config is the value of the env var `ENABLE_COMET` if set, or true otherwise.")
87+
"both this config and `spark.comet.exec.enabled` need to be enabled.")
8988
.booleanConf
90-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean)
89+
.createWithEnvVarOrDefault("ENABLE_COMET", true)
9190

9291
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
9392
.category(CATEGORY_SCAN)
@@ -119,9 +118,7 @@ object CometConf extends ShimCometConf {
119118
.transform(_.toLowerCase(Locale.ROOT))
120119
.checkValues(
121120
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
122-
.createWithDefault(sys.env
123-
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
124-
.toLowerCase(Locale.ROOT))
121+
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
125122

126123
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
127124
conf("spark.comet.parquet.respectFilterPushdown")
@@ -493,8 +490,7 @@ object CometConf extends ShimCometConf {
493490
.category(CATEGORY_EXEC_EXPLAIN)
494491
.doc("When this setting is enabled, Comet will log warnings for all fallback reasons.")
495492
.booleanConf
496-
.createWithDefault(
497-
sys.env.getOrElse("ENABLE_COMET_LOG_FALLBACK_REASONS", "false").toBoolean)
493+
.createWithEnvVarOrDefault("ENABLE_COMET_LOG_FALLBACK_REASONS", false)
498494

499495
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
500496
conf("spark.comet.explainFallback.enabled")
@@ -524,7 +520,7 @@ object CometConf extends ShimCometConf {
524520
.category(CATEGORY_TESTING)
525521
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
526522
.booleanConf
527-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)
523+
.createWithEnvVarOrDefault("ENABLE_COMET_ONHEAP", false)
528524

529525
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
530526
conf("spark.comet.exec.memoryPool")
@@ -705,9 +701,9 @@ object CometConf extends ShimCometConf {
705701
val COMET_STRICT_TESTING: ConfigEntry[Boolean] = conf(s"$COMET_PREFIX.testing.strict")
706702
.category(CATEGORY_TESTING)
707703
.doc("Experimental option to enable strict testing, which will fail tests that could be " +
708-
"more comprehensive, such as checking for a specific fallback reason")
704+
"more comprehensive, such as checking for a specific fallback reason.")
709705
.booleanConf
710-
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_STRICT_TESTING", "false").toBoolean)
706+
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
711707

712708
/** Create a config to enable a specific operator */
713709
private def createExecEnabledConfig(
@@ -865,6 +861,36 @@ private class TypedConfigBuilder[T](
865861
CometConf.register(conf)
866862
conf
867863
}
864+
865+
/**
866+
* Creates a [[ConfigEntry]] that has a default value, with support for environment variable
867+
* override.
868+
*
869+
* The value is resolved in the following priority order:
870+
* 1. Spark config value (if set) 2. Environment variable value (if set) 3. Default value
871+
*
872+
* @param envVar
873+
* The environment variable name to check for override value
874+
* @param default
875+
* The default value to use if neither config nor env var is set
876+
* @return
877+
* A ConfigEntry with environment variable support
878+
*/
879+
def createWithEnvVarOrDefault(envVar: String, default: T): ConfigEntry[T] = {
880+
val transformedDefault = converter(sys.env.getOrElse(envVar, stringConverter(default)))
881+
val conf = new ConfigEntryWithDefault[T](
882+
parent.key,
883+
transformedDefault,
884+
converter,
885+
stringConverter,
886+
parent._doc,
887+
parent._category,
888+
parent._public,
889+
parent._version,
890+
Some(envVar))
891+
CometConf.register(conf)
892+
conf
893+
}
868894
}
869895

870896
private[comet] abstract class ConfigEntry[T](
@@ -892,6 +918,11 @@ private[comet] abstract class ConfigEntry[T](
892918

893919
def defaultValueString: String
894920

921+
/**
922+
* The environment variable name that can override this config's default value, if applicable.
923+
*/
924+
def envVar: Option[String] = None
925+
895926
override def toString: String = {
896927
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
897928
s"public=$isPublic, version=$version)"
@@ -906,12 +937,15 @@ private[comet] class ConfigEntryWithDefault[T](
906937
doc: String,
907938
category: String,
908939
isPublic: Boolean,
909-
version: String)
940+
version: String,
941+
_envVar: Option[String] = None)
910942
extends ConfigEntry(key, valueConverter, stringConverter, doc, category, isPublic, version) {
911943
override def defaultValue: Option[T] = Some(_defaultValue)
912944

913945
override def defaultValueString: String = stringConverter(_defaultValue)
914946

947+
override def envVar: Option[String] = _envVar
948+
915949
def get(conf: SQLConf): T = {
916950
val tmp = conf.getConfString(key, null)
917951
if (tmp == null) {

docs/source/user-guide/latest/configs.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
6363
| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
6464
| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
6565
| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true |
66-
| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
66+
| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. Can be overridden by environment variable `ENABLE_COMET`. | true |
6767
| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
6868
| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec.<operator_name>.enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true |
6969
| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
@@ -86,7 +86,7 @@ These settings can be used to determine which parts of the plan are accelerated
8686
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
8787
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
8888
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
89-
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
89+
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. Can be overridden by environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false |
9090
<!--END:CONFIG_TABLE-->
9191

9292
## Shuffle Configuration Settings
@@ -127,10 +127,10 @@ These settings can be used to determine which parts of the plan are accelerated
127127
| Config | Description | Default Value |
128128
|--------|-------------|---------------|
129129
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
130-
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
130+
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. Can be overridden by environment variable `ENABLE_COMET_ONHEAP`. | false |
131131
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
132132
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
133-
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason | false |
133+
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. Can be overridden by environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
134134
<!--END:CONFIG_TABLE-->
135135

136136
## Enabling or Disabling Individual Operators

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,22 @@ object GenerateDocs {
7474
// convert links to Markdown
7575
val doc =
7676
urlPattern.replaceAllIn(conf.doc.trim, m => s"[Comet ${m.group(1)} Guide](")
77+
// append env var info if present
78+
val docWithEnvVar = conf.envVar match {
79+
case Some(envVarName) =>
80+
s"$doc Can be overridden by environment variable `$envVarName`."
81+
case None => doc
82+
}
7783
if (conf.defaultValue.isEmpty) {
78-
w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
84+
w.write(s"| `${conf.key}` | $docWithEnvVar | |\n".getBytes)
7985
} else {
8086
val isBytesConf = conf.key == COMET_ONHEAP_MEMORY_OVERHEAD.key
8187
if (isBytesConf) {
8288
val bytes = conf.defaultValue.get.asInstanceOf[Long]
83-
w.write(s"| `${conf.key}` | $doc | $bytes MiB |\n".getBytes)
89+
w.write(s"| `${conf.key}` | $docWithEnvVar | $bytes MiB |\n".getBytes)
8490
} else {
85-
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
91+
val defaultVal = conf.defaultValueString
92+
w.write(s"| `${conf.key}` | $docWithEnvVar | $defaultVal |\n".getBytes)
8693
}
8794
}
8895
}

0 commit comments

Comments
 (0)