Skip to content

Commit a72151f

Browse files
authored
Merge pull request apache-spark-on-k8s#258 from palantir/resync-kube
[NOSQUASH] Resync kube
2 parents c2f7e47 + 9c3a55b commit a72151f

File tree

7 files changed

+64
-20
lines changed

7 files changed

+64
-20
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ private[spark] class MountSmallLocalFilesStep(
3434

3535
import MountSmallLocalFilesStep._
3636
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
37-
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles).map(new File(_))
37+
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
38+
.map(localFileUri => new File(Utils.resolveURI(localFileUri).getPath))
3839
val totalSizeBytes = localFiles.map(_.length()).sum
3940
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
4041
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
457457
.withValue(cp)
458458
.build()
459459
}
460-
val requiredEnv = (Seq(
460+
val executorExtraJavaOptionsEnv = conf
461+
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
462+
.map { opts =>
463+
val delimitedOpts = Utils.splitCommandString(opts)
464+
delimitedOpts.zipWithIndex.map {
465+
case (opt, index) =>
466+
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
467+
}
468+
}.getOrElse(Seq.empty[EnvVar])
469+
val executorEnv = (Seq(
461470
(ENV_EXECUTOR_PORT, executorPort.toString),
462471
(ENV_DRIVER_URL, driverUrl),
463472
// Executor backend expects integral value for executor cores, so round it up to an int.
@@ -477,7 +486,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
477486
.withNewFieldRef("v1", "status.podIP")
478487
.build())
479488
.build()
480-
)
489+
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
481490
val requiredPorts = Seq(
482491
(EXECUTOR_PORT_NAME, executorPort),
483492
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
@@ -497,8 +506,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
497506
.addToLimits("memory", executorMemoryLimitQuantity)
498507
.addToRequests("cpu", executorCpuQuantity)
499508
.endResources()
500-
.addAllToEnv(requiredEnv.asJava)
501-
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
509+
.addAllToEnv(executorEnv.asJava)
502510
.withPorts(requiredPorts.asJava)
503511
.build()
504512

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.kubernetes.constants._
3232
import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrap
3333
import org.apache.spark.util.Utils
3434

35-
private[spark] class MountSmallLocalFilesStepTest extends SparkFunSuite with BeforeAndAfter {
35+
private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with BeforeAndAfter {
3636

3737
private val FIRST_TEMP_FILE_NAME = "file1.txt"
3838
private val SECOND_TEMP_FILE_NAME = "file2.txt"
@@ -56,11 +56,11 @@ private[spark] class MountSmallLocalFilesStepTest extends SparkFunSuite with Bef
5656

5757
test("Local files should be added to the secret.") {
5858
val firstTempFile = createTempFileWithContents(
59-
tempFolder, FIRST_TEMP_FILE_NAME, FIRST_TEMP_FILE_CONTENTS)
59+
tempFolder, FIRST_TEMP_FILE_NAME, FIRST_TEMP_FILE_CONTENTS)
6060
val secondTempFile = createTempFileWithContents(
6161
tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS)
6262
val sparkFiles = Seq(
63-
firstTempFile.getAbsolutePath,
63+
s"file://${firstTempFile.getAbsolutePath}",
6464
secondTempFile.getAbsolutePath,
6565
REMOTE_FILE_URI)
6666
val configurationStep = new MountSmallLocalFilesStep(

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ ENV PYSPARK_PYTHON python
3838
ENV PYSPARK_DRIVER_PYTHON python
3939
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
4040

41-
# TODO support spark.executor.extraClassPath
4241
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
42+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
43+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
4344
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4445
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4546
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
4647
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
47-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
48+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ FROM spark-base
2323

2424
COPY examples /opt/spark/examples
2525

26-
# TODO support spark.executor.extraClassPath
2726
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
27+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
28+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
2829
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2930
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3031
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3132
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
3233
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
33-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
34+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,51 @@ private[spark] object JavaOptionsTest {
2929

3030
def main(args: Array[String]): Unit = {
3131
// scalastyle:off println
32-
if (args.length != 1) {
32+
if (args.length != 2) {
3333
println(s"Invalid arguments: ${args.mkString(",")}." +
34-
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
34+
s"Usage: JavaOptionsTest <driver-java-options-list-file> <executor-java-options-list-file>")
3535
System.exit(1)
3636
}
3737
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
38+
val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1))
3839
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
3940
case (optKey, optValue) => System.getProperty(optKey) != optValue
4041
}
4142
if (nonMatchingDriverOptions.nonEmpty) {
4243
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
4344
s" But these options did not match: $nonMatchingDriverOptions.")
4445
val sysProps = Maps.fromProperties(System.getProperties).asScala
45-
println("System properties are:")
46+
println("Driver system properties are:")
4647
for (prop <- sysProps) {
4748
println(s"Key: ${prop._1}, Value: ${prop._2}")
4849
}
4950
System.exit(1)
5051
}
5152

52-
// TODO support spark.executor.extraJavaOptions and test here.
53-
println(s"All expected JVM options were present on the driver and executors.")
53+
val spark = SparkSession.builder().getOrCreate().sparkContext
54+
try {
55+
val nonMatchingExecutorOptions = spark.parallelize(Seq(0)).flatMap { _ =>
56+
expectedExecutorJavaOptions.filter {
57+
case (optKey, optValue) => System.getProperty(optKey) != optValue
58+
}
59+
}.collectAsMap()
60+
if (nonMatchingExecutorOptions.nonEmpty) {
61+
val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ =>
62+
Maps.fromProperties(System.getProperties).asScala
63+
}.collectAsMap()
64+
println(s"The executor's JVM options did not match. Expected" +
65+
s" $expectedExecutorJavaOptions. But these options did not" +
66+
s" match: $nonMatchingExecutorOptions.")
67+
println("Executor system properties are:")
68+
for (prop <- executorSysProps) {
69+
println(s"Key: ${prop._1}, Value: ${prop._2}")
70+
}
71+
} else {
72+
println("All expected JVM options were present on the driver and executors.")
73+
}
74+
} finally {
75+
spark.stop()
76+
}
5477
// scalastyle:on println
5578
}
5679

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
234234
launchStagingServer(SSLOptions(), None)
235235
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
236236
Map("simpleDriverConf" -> "simpleDriverConfValue",
237-
"driverconfwithspaces" -> "driver conf with spaces value"),
237+
"driverconfwithspaces" -> "driver conf with spaces value"),
238238
"driver-jvm-options.properties",
239239
"JVM options that should be set on the driver.")
240+
val executorJvmOptionsFile = storeJvmOptionsInTempFile(
241+
Map("simpleExecutorConf" -> "simpleExecutorConfValue",
242+
"executor conf with spaces" -> "executor conf with spaces value"),
243+
"executor-jvm-options.properties",
244+
"JVM options that should be set on the executors.")
240245
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
241246
"-DsimpleDriverConf=simpleDriverConfValue" +
242247
" -Ddriverconfwithspaces='driver conf with spaces value'")
243-
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
248+
sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
249+
"-DsimpleExecutorConf=simpleExecutorConfValue" +
250+
" -D\'executor conf with spaces\'=\'executor conf with spaces value\'")
251+
sparkConf.set("spark.files",
252+
Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath)
253+
.mkString(","))
244254
runSparkApplicationAndVerifyCompletion(
245255
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
246256
JAVA_OPTIONS_MAIN_CLASS,
247257
Seq(s"All expected JVM options were present on the driver and executors."),
248-
Array(driverJvmOptionsFile.getName),
258+
Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName),
249259
Seq.empty[String])
250260
}
251261

0 commit comments

Comments
 (0)