Skip to content

Commit f830005

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor.
## What changes were proposed in this pull request? This PR adds two new config properties: `spark.driver.defaultJavaOptions` and `spark.executor.defaultJavaOptions`. These are intended to be set by administrators in a file of defaults for options like JVM garbage collection algorithm. Users will still set `extraJavaOptions` properties, and both sets of JVM options will be added to start a JVM (default options are prepended to extra options). ## How was this patch tested? Existing + additional unit tests. ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes apache#24804 from gaborgsomogyi/SPARK-23472. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent d47c219 commit f830005

File tree

12 files changed

+297
-41
lines changed

12 files changed

+297
-41
lines changed

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
2424
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
2525
import org.apache.spark.deploy.ClientArguments._
2626
import org.apache.spark.internal.config
27+
import org.apache.spark.launcher.SparkLauncher
2728
import org.apache.spark.rpc.RpcEndpointRef
2829
import org.apache.spark.util.Utils
2930

@@ -135,6 +136,7 @@ private[rest] class StandaloneSubmitRequestServlet(
135136
val sparkProperties = request.sparkProperties
136137
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
137138
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
139+
val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
138140
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
139141
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
140142
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
@@ -160,9 +162,11 @@ private[rest] class StandaloneSubmitRequestServlet(
160162
.set("spark.master", updatedMasters)
161163
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
162164
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
165+
val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString)
166+
.getOrElse(Seq.empty)
163167
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
164168
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
165-
val javaOpts = sparkJavaOpts ++ extraJavaOpts
169+
val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts
166170
val command = new Command(
167171
"org.apache.spark.deploy.worker.DriverWrapper",
168172
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper

core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ private[spark] class TypedConfigBuilder[T](
127127

128128
/** Creates a [[ConfigEntry]] that does not have a default value. */
129129
def createOptional: OptionalConfigEntry[T] = {
130-
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
131-
stringConverter, parent._doc, parent._public)
130+
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
131+
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
132+
parent._public)
132133
parent._onCreate.foreach(_(entry))
133134
entry
134135
}
@@ -141,17 +142,19 @@ private[spark] class TypedConfigBuilder[T](
141142
createWithDefaultString(default.asInstanceOf[String])
142143
} else {
143144
val transformedDefault = converter(stringConverter(default))
144-
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
145-
transformedDefault, converter, stringConverter, parent._doc, parent._public)
145+
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
146+
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
147+
stringConverter, parent._doc, parent._public)
146148
parent._onCreate.foreach(_(entry))
147149
entry
148150
}
149151
}
150152

151153
/** Creates a [[ConfigEntry]] with a function to determine the default value */
152154
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
153-
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
154-
converter, stringConverter, parent._doc, parent._public)
155+
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
156+
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
157+
parent._doc, parent._public)
155158
parent._onCreate.foreach(_ (entry))
156159
entry
157160
}
@@ -161,8 +164,9 @@ private[spark] class TypedConfigBuilder[T](
161164
* [[String]] and must be a valid value for the entry.
162165
*/
163166
def createWithDefaultString(default: String): ConfigEntry[T] = {
164-
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
165-
converter, stringConverter, parent._doc, parent._public)
167+
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
168+
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
169+
parent._doc, parent._public)
166170
parent._onCreate.foreach(_(entry))
167171
entry
168172
}
@@ -178,6 +182,8 @@ private[spark] case class ConfigBuilder(key: String) {
178182

179183
import ConfigHelpers._
180184

185+
private[config] var _prependedKey: Option[String] = None
186+
private[config] var _prependSeparator: String = ""
181187
private[config] var _public = true
182188
private[config] var _doc = ""
183189
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
@@ -202,24 +208,34 @@ private[spark] case class ConfigBuilder(key: String) {
202208
this
203209
}
204210

211+
def withPrepended(key: String, separator: String = " "): ConfigBuilder = {
212+
_prependedKey = Option(key)
213+
_prependSeparator = separator
214+
this
215+
}
216+
205217
def withAlternative(key: String): ConfigBuilder = {
206218
_alternatives = _alternatives :+ key
207219
this
208220
}
209221

210222
def intConf: TypedConfigBuilder[Int] = {
223+
checkPrependConfig
211224
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
212225
}
213226

214227
def longConf: TypedConfigBuilder[Long] = {
228+
checkPrependConfig
215229
new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
216230
}
217231

218232
def doubleConf: TypedConfigBuilder[Double] = {
233+
checkPrependConfig
219234
new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
220235
}
221236

222237
def booleanConf: TypedConfigBuilder[Boolean] = {
238+
checkPrependConfig
223239
new TypedConfigBuilder(this, toBoolean(_, key))
224240
}
225241

@@ -228,20 +244,30 @@ private[spark] case class ConfigBuilder(key: String) {
228244
}
229245

230246
def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
247+
checkPrependConfig
231248
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
232249
}
233250

234251
def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
252+
checkPrependConfig
235253
new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
236254
}
237255

238256
def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
239-
val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
257+
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
258+
_public, fallback)
240259
_onCreate.foreach(_(entry))
241260
entry
242261
}
243262

244263
def regexConf: TypedConfigBuilder[Regex] = {
264+
checkPrependConfig
245265
new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString)
246266
}
267+
268+
private def checkPrependConfig = {
269+
if (_prependedKey.isDefined) {
270+
throw new IllegalArgumentException(s"$key type must be string if prepend used")
271+
}
272+
}
247273
}

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ package org.apache.spark.internal.config
2828
* value declared as a string.
2929
*
3030
* @param key the key for the configuration
31+
* @param prependedKey the key for the configuration which will be prepended
32+
* @param prependSeparator the separator which is used for prepending
3133
* @param valueConverter how to convert a string to the value. It should throw an exception if the
3234
* string does not have the required format.
3335
* @param stringConverter how to convert a value to a string that the user can use it as a valid
@@ -41,6 +43,8 @@ package org.apache.spark.internal.config
4143
*/
4244
private[spark] abstract class ConfigEntry[T] (
4345
val key: String,
46+
val prependedKey: Option[String],
47+
val prependSeparator: String,
4448
val alternatives: List[String],
4549
val valueConverter: String => T,
4650
val stringConverter: T => String,
@@ -54,7 +58,15 @@ private[spark] abstract class ConfigEntry[T] (
5458
def defaultValueString: String
5559

5660
protected def readString(reader: ConfigReader): Option[String] = {
57-
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
61+
val values = Seq(
62+
prependedKey.flatMap(reader.get(_)),
63+
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
64+
).flatten
65+
if (values.nonEmpty) {
66+
Some(values.mkString(prependSeparator))
67+
} else {
68+
None
69+
}
5870
}
5971

6072
def readFrom(reader: ConfigReader): T
@@ -68,13 +80,24 @@ private[spark] abstract class ConfigEntry[T] (
6880

6981
private class ConfigEntryWithDefault[T] (
7082
key: String,
83+
prependedKey: Option[String],
84+
prependSeparator: String,
7185
alternatives: List[String],
7286
_defaultValue: T,
7387
valueConverter: String => T,
7488
stringConverter: T => String,
7589
doc: String,
7690
isPublic: Boolean)
77-
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
91+
extends ConfigEntry(
92+
key,
93+
prependedKey,
94+
prependSeparator,
95+
alternatives,
96+
valueConverter,
97+
stringConverter,
98+
doc,
99+
isPublic
100+
) {
78101

79102
override def defaultValue: Option[T] = Some(_defaultValue)
80103

@@ -86,14 +109,25 @@ private class ConfigEntryWithDefault[T] (
86109
}
87110

88111
private class ConfigEntryWithDefaultFunction[T] (
89-
key: String,
90-
alternatives: List[String],
91-
_defaultFunction: () => T,
92-
valueConverter: String => T,
93-
stringConverter: T => String,
94-
doc: String,
95-
isPublic: Boolean)
96-
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
112+
key: String,
113+
prependedKey: Option[String],
114+
prependSeparator: String,
115+
alternatives: List[String],
116+
_defaultFunction: () => T,
117+
valueConverter: String => T,
118+
stringConverter: T => String,
119+
doc: String,
120+
isPublic: Boolean)
121+
extends ConfigEntry(
122+
key,
123+
prependedKey,
124+
prependSeparator,
125+
alternatives,
126+
valueConverter,
127+
stringConverter,
128+
doc,
129+
isPublic
130+
) {
97131

98132
override def defaultValue: Option[T] = Some(_defaultFunction())
99133

@@ -106,13 +140,24 @@ private class ConfigEntryWithDefaultFunction[T] (
106140

107141
private class ConfigEntryWithDefaultString[T] (
108142
key: String,
143+
prependedKey: Option[String],
144+
prependSeparator: String,
109145
alternatives: List[String],
110146
_defaultValue: String,
111147
valueConverter: String => T,
112148
stringConverter: T => String,
113149
doc: String,
114150
isPublic: Boolean)
115-
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
151+
extends ConfigEntry(
152+
key,
153+
prependedKey,
154+
prependSeparator,
155+
alternatives,
156+
valueConverter,
157+
stringConverter,
158+
doc,
159+
isPublic
160+
) {
116161

117162
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
118163

@@ -130,14 +175,23 @@ private class ConfigEntryWithDefaultString[T] (
130175
*/
131176
private[spark] class OptionalConfigEntry[T](
132177
key: String,
178+
prependedKey: Option[String],
179+
prependSeparator: String,
133180
alternatives: List[String],
134181
val rawValueConverter: String => T,
135182
val rawStringConverter: T => String,
136183
doc: String,
137184
isPublic: Boolean)
138-
extends ConfigEntry[Option[T]](key, alternatives,
185+
extends ConfigEntry[Option[T]](
186+
key,
187+
prependedKey,
188+
prependSeparator,
189+
alternatives,
139190
s => Some(rawValueConverter(s)),
140-
v => v.map(rawStringConverter).orNull, doc, isPublic) {
191+
v => v.map(rawStringConverter).orNull,
192+
doc,
193+
isPublic
194+
) {
141195

142196
override def defaultValueString: String = ConfigEntry.UNDEFINED
143197

@@ -151,12 +205,22 @@ private[spark] class OptionalConfigEntry[T](
151205
*/
152206
private[spark] class FallbackConfigEntry[T] (
153207
key: String,
208+
prependedKey: Option[String],
209+
prependSeparator: String,
154210
alternatives: List[String],
155211
doc: String,
156212
isPublic: Boolean,
157213
val fallback: ConfigEntry[T])
158-
extends ConfigEntry[T](key, alternatives,
159-
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
214+
extends ConfigEntry[T](
215+
key,
216+
prependedKey,
217+
prependSeparator,
218+
alternatives,
219+
fallback.valueConverter,
220+
fallback.stringConverter,
221+
doc,
222+
isPublic
223+
) {
160224

161225
override def defaultValueString: String = s"<value of ${fallback.key}>"
162226

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ package object config {
4848
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
4949

5050
private[spark] val DRIVER_JAVA_OPTIONS =
51-
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
51+
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
52+
.withPrepended(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
53+
.stringConf
54+
.createOptional
5255

5356
private[spark] val DRIVER_LIBRARY_PATH =
5457
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
@@ -174,7 +177,10 @@ package object config {
174177
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)
175178

176179
private[spark] val EXECUTOR_JAVA_OPTIONS =
177-
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
180+
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
181+
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)
182+
.stringConf
183+
.createOptional
178184

179185
private[spark] val EXECUTOR_LIBRARY_PATH =
180186
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional

0 commit comments

Comments
 (0)