Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2cb2074

Browse files
mccheahash211
authored andcommitted
Support executor java options. (#445)
1 parent f7b5820 commit 2cb2074

File tree

5 files changed

+64
-20
lines changed

5 files changed

+64
-20
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
455455
.withValue(cp)
456456
.build()
457457
}
458-
val requiredEnv = (Seq(
458+
val executorExtraJavaOptionsEnv = conf
459+
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
460+
.map { opts =>
461+
val delimitedOpts = Utils.splitCommandString(opts)
462+
delimitedOpts.zipWithIndex.map {
463+
case (opt, index) =>
464+
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
465+
}
466+
}.getOrElse(Seq.empty[EnvVar])
467+
val executorEnv = (Seq(
459468
(ENV_EXECUTOR_PORT, executorPort.toString),
460469
(ENV_DRIVER_URL, driverUrl),
461470
// Executor backend expects integral value for executor cores, so round it up to an int.
@@ -475,7 +484,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
475484
.withNewFieldRef("v1", "status.podIP")
476485
.build())
477486
.build()
478-
)
487+
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
479488
val requiredPorts = Seq(
480489
(EXECUTOR_PORT_NAME, executorPort),
481490
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
@@ -495,8 +504,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
495504
.addToLimits("memory", executorMemoryLimitQuantity)
496505
.addToRequests("cpu", executorCpuQuantity)
497506
.endResources()
498-
.addAllToEnv(requiredEnv.asJava)
499-
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
507+
.addAllToEnv(executorEnv.asJava)
500508
.withPorts(requiredPorts.asJava)
501509
.build()
502510

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ ENV PYSPARK_PYTHON python
3838
ENV PYSPARK_DRIVER_PYTHON python
3939
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
4040

41-
# TODO support spark.executor.extraClassPath
4241
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
42+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
43+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
4344
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4445
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
45-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
46-
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
47-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
46+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
47+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
48+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ FROM spark-base
2323

2424
COPY examples /opt/spark/examples
2525

26-
# TODO support spark.executor.extraClassPath
2726
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
27+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
28+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
2829
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2930
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3031
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
31-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
32-
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
33-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
32+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
33+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
34+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,52 @@ private[spark] object JavaOptionsTest {
2929

3030
def main(args: Array[String]): Unit = {
3131
// scalastyle:off println
32-
if (args.length != 1) {
32+
if (args.length != 2) {
3333
println(s"Invalid arguments: ${args.mkString(",")}." +
34-
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
34+
s"Usage: JavaOptionsTest <driver-java-options-list-file> <executor-java-options-list-file>")
3535
System.exit(1)
3636
}
3737
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
38+
val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1))
3839
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
3940
case (optKey, optValue) => System.getProperty(optKey) != optValue
4041
}
4142
if (nonMatchingDriverOptions.nonEmpty) {
4243
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
4344
s" But these options did not match: $nonMatchingDriverOptions.")
4445
val sysProps = Maps.fromProperties(System.getProperties).asScala
45-
println("System properties are:")
46+
println("Driver system properties are:")
4647
for (prop <- sysProps) {
4748
println(s"Key: ${prop._1}, Value: ${prop._2}")
4849
}
4950
System.exit(1)
5051
}
5152

52-
// TODO support spark.executor.extraJavaOptions and test here.
53-
println(s"All expected JVM options were present on the driver and executors.")
53+
val spark = SparkSession.builder().getOrCreate().sparkContext
54+
val nonMatchingExecutorOptions = try {
55+
spark.parallelize(Seq(0)).flatMap { _ =>
56+
expectedExecutorJavaOptions.filter {
57+
case (optKey, optValue) => System.getProperty(optKey) != optValue
58+
}
59+
}.collectAsMap()
60+
} finally {
61+
spark.stop()
62+
}
63+
if (nonMatchingExecutorOptions.nonEmpty) {
64+
val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ =>
65+
Maps.fromProperties(System.getProperties).asScala
66+
}.collectAsMap()
67+
println(s"The executor's JVM options did not match. Expected" +
68+
s" $expectedExecutorJavaOptions. But these options did not" +
69+
s" match: $nonMatchingExecutorOptions.")
70+
println("Executor system properties are:")
71+
for (prop <- executorSysProps) {
72+
println(s"Key: ${prop._1}, Value: ${prop._2}")
73+
}
74+
System.exit(1)
75+
} else {
76+
println("All expected JVM options were present on the driver and executors.")
77+
}
5478
// scalastyle:on println
5579
}
5680

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
234234
launchStagingServer(SSLOptions(), None)
235235
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
236236
Map("simpleDriverConf" -> "simpleDriverConfValue",
237-
"driverconfwithspaces" -> "driver conf with spaces value"),
237+
"driverconfwithspaces" -> "driver conf with spaces value"),
238238
"driver-jvm-options.properties",
239239
"JVM options that should be set on the driver.")
240+
val executorJvmOptionsFile = storeJvmOptionsInTempFile(
241+
Map("simpleExecutorConf" -> "simpleExecutorConfValue",
242+
"executor conf with spaces" -> "executor conf with spaces value"),
243+
"executor-jvm-options.properties",
244+
"JVM options that should be set on the executors.")
240245
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
241246
"-DsimpleDriverConf=simpleDriverConfValue" +
242247
" -Ddriverconfwithspaces='driver conf with spaces value'")
243-
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
248+
sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
249+
"-DsimpleExecutorConf=simpleExecutorConfValue" +
250+
" -D\'executor conf with spaces\'=\'executor conf with spaces value\'")
251+
sparkConf.set("spark.files",
252+
Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath)
253+
.mkString(","))
244254
runSparkApplicationAndVerifyCompletion(
245255
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
246256
JAVA_OPTIONS_MAIN_CLASS,
247257
Seq(s"All expected JVM options were present on the driver and executors."),
248-
Array(driverJvmOptionsFile.getName),
258+
Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName),
249259
Seq.empty[String])
250260
}
251261

0 commit comments

Comments
 (0)