Skip to content

Commit f94499b

Browse files
ifilonenkofoxish
authored andcommitted
SparkR Support (apache-spark-on-k8s#507)
* initial R support without integration tests * finished sparkR integration * case sensitive file names in unix * revert back to previous lower case in dockerfile * addition into the build-push-docker-images
1 parent 49932d6 commit f94499b

File tree

15 files changed

+374
-27
lines changed

15 files changed

+374
-27
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,6 @@ object SparkSubmit extends CommandLineUtils {
346346
(clusterManager, deployMode) match {
347347
case (KUBERNETES, CLIENT) =>
348348
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
349-
case (KUBERNETES, CLUSTER) if args.isR =>
350-
printErrorAndExit("Kubernetes does not currently support R applications.")
351349
case (STANDALONE, CLUSTER) if args.isPython =>
352350
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
353351
"applications on standalone clusters.")
@@ -642,6 +640,9 @@ object SparkSubmit extends CommandLineUtils {
642640
if (args.pyFiles != null) {
643641
childArgs ++= Array("--other-py-files", args.pyFiles)
644642
}
643+
} else if (args.isR) {
644+
childArgs ++= Array("--primary-r-file", args.primaryResource)
645+
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
645646
} else {
646647
childArgs ++= Array("--primary-java-resource", args.primaryResource)
647648
childArgs ++= Array("--main-class", args.mainClass)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ package object constants {
6969
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
7070
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7171
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
72+
private[spark] val ENV_R_FILE = "R_FILE"
7273
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
7374
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
7475

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private[spark] object ClientArguments {
4646
args.sliding(2, 2).toList.collect {
4747
case Array("--primary-py-file", mainPyFile: String) =>
4848
mainAppResource = Some(PythonMainAppResource(mainPyFile))
49+
case Array("--primary-r-file", primaryRFile: String) =>
50+
mainAppResource = Some(RMainAppResource(primaryRFile))
4951
case Array("--primary-java-resource", primaryJavaResource: String) =>
5052
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
5153
case Array("--main-class", clazz: String) =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.k8s.ConfigurationUtils
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
23+
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
2424
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
2626
import org.apache.spark.launcher.SparkLauncher
@@ -64,6 +64,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
6464
Option(resource)
6565
case _ => Option.empty
6666
}
67+
val additionalMainAppRFile = mainAppResource match {
68+
case RMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
69+
Option(resource)
70+
case _ => Option.empty
71+
}
6772
val sparkJars = submissionSparkConf.getOption("spark.jars")
6873
.map(_.split(","))
6974
.getOrElse(Array.empty[String]) ++
@@ -72,6 +77,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
7277
.map(_.split(","))
7378
.getOrElse(Array.empty[String]) ++
7479
additionalMainAppPythonFile.toSeq ++
80+
additionalMainAppRFile.toSeq ++
7581
additionalPythonFiles
7682
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
7783
submissionSparkConf,
@@ -108,9 +114,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
108114
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
109115
submissionSparkConf)
110116

111-
val pythonStep = mainAppResource match {
117+
val resourceStep = mainAppResource match {
112118
case PythonMainAppResource(mainPyResource) =>
113119
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
120+
case RMainAppResource(mainRFile) =>
121+
Option(new RStep(mainRFile, filesDownloadPath))
114122
case _ => Option.empty[DriverConfigurationStep]
115123
}
116124

@@ -188,7 +196,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
188196
dependencyResolutionStep,
189197
localDirectoryMountConfigurationStep) ++
190198
submittedDependenciesBootstrapSteps ++
191-
pythonStep.toSeq ++
199+
resourceStep.toSeq ++
192200
mountSecretsStep.toSeq
193201
}
194202

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ private[spark] sealed trait MainAppResource
2020

2121
private[spark] case class PythonMainAppResource(primaryPyFile: String) extends MainAppResource
2222

23+
private[spark] case class RMainAppResource(primaryRFile: String) extends MainAppResource
24+
2325
private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps
18+
19+
import io.fabric8.kubernetes.api.model.ContainerBuilder
20+
21+
import org.apache.spark.deploy.k8s.constants._
22+
import org.apache.spark.deploy.k8s.submit.KubernetesFileUtils
23+
24+
private[spark] class RStep(
25+
mainRFile: String,
26+
filesDownloadPath: String) extends DriverConfigurationStep {
27+
28+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
29+
val withRFileContainer = new ContainerBuilder(driverSpec.driverContainer)
30+
.addNewEnv()
31+
.withName(ENV_R_FILE)
32+
.withValue(KubernetesFileUtils.resolveFilePath(mainRFile, filesDownloadPath))
33+
.endEnv()
34+
driverSpec.copy(driverContainer = withRFileContainer.build())
35+
}
36+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
2020
import org.apache.spark.deploy.k8s.config._
21-
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
21+
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
2222

2323
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
2424

@@ -45,7 +45,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
4545
APP_NAME,
4646
MAIN_CLASS,
4747
APP_ARGS,
48-
ADDITIONAL_PYTHON_FILES,
48+
Seq.empty[String],
4949
sparkConf)
5050
validateStepTypes(
5151
orchestrator,
@@ -69,7 +69,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
6969
APP_NAME,
7070
MAIN_CLASS,
7171
APP_ARGS,
72-
ADDITIONAL_PYTHON_FILES,
72+
Seq.empty[String],
7373
sparkConf)
7474
validateStepTypes(
7575
orchestrator,
@@ -85,15 +85,15 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
8585
val sparkConf = new SparkConf(false)
8686
val mainAppResource = PythonMainAppResource("local:///var/apps/python/main.py")
8787
val orchestrator = new DriverConfigurationStepsOrchestrator(
88-
NAMESPACE,
89-
APP_ID,
90-
LAUNCH_TIME,
91-
mainAppResource,
92-
APP_NAME,
93-
MAIN_CLASS,
94-
APP_ARGS,
95-
ADDITIONAL_PYTHON_FILES,
96-
sparkConf)
88+
NAMESPACE,
89+
APP_ID,
90+
LAUNCH_TIME,
91+
mainAppResource,
92+
APP_NAME,
93+
MAIN_CLASS,
94+
APP_ARGS,
95+
ADDITIONAL_PYTHON_FILES,
96+
sparkConf)
9797
validateStepTypes(
9898
orchestrator,
9999
classOf[BaseDriverConfigurationStep],
@@ -104,6 +104,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
104104
classOf[PythonStep])
105105
}
106106

107+
test("Submission steps with R file.") {
108+
val sparkConf = new SparkConf(false)
109+
val mainAppResource = RMainAppResource("local:///var/apps/r/main.R")
110+
val orchestrator = new DriverConfigurationStepsOrchestrator(
111+
NAMESPACE,
112+
APP_ID,
113+
LAUNCH_TIME,
114+
mainAppResource,
115+
APP_NAME,
116+
MAIN_CLASS,
117+
APP_ARGS,
118+
Seq.empty[String],
119+
sparkConf)
120+
validateStepTypes(
121+
orchestrator,
122+
classOf[BaseDriverConfigurationStep],
123+
classOf[DriverAddressConfigurationStep],
124+
classOf[DriverKubernetesCredentialsStep],
125+
classOf[DependencyResolutionStep],
126+
classOf[LocalDirectoryMountConfigurationStep],
127+
classOf[RStep])
128+
}
129+
130+
107131
test("Only local files without a resource staging server.") {
108132
val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt")
109133
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
@@ -115,7 +139,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
115139
APP_NAME,
116140
MAIN_CLASS,
117141
APP_ARGS,
118-
ADDITIONAL_PYTHON_FILES,
142+
Seq.empty[String],
119143
sparkConf)
120144
validateStepTypes(
121145
orchestrator,
@@ -140,7 +164,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
140164
APP_NAME,
141165
MAIN_CLASS,
142166
APP_ARGS,
143-
ADDITIONAL_PYTHON_FILES,
167+
Seq.empty[String],
144168
sparkConf)
145169
validateStepTypes(
146170
orchestrator,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps
18+
19+
import io.fabric8.kubernetes.api.model._
20+
import org.scalatest.BeforeAndAfter
21+
import scala.collection.JavaConverters._
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
25+
class RStepSuite extends SparkFunSuite with BeforeAndAfter {
26+
private val FILE_DOWNLOAD_PATH = "/var/data/spark-files"
27+
private val R_PRIMARY_FILE_OP1 = "local:///app/files/file1.R"
28+
private val RESOLVED_R_PRIMARY_FILE_OP1 = "/app/files/file1.R"
29+
private val R_PRIMARY_FILE_OP2 = "file:///app/files/file2.R"
30+
private val RESOLVED_R_PRIMARY_FILE_OP2 = FILE_DOWNLOAD_PATH + "/file2.R"
31+
32+
test("testing RSpark with local file") {
33+
val rStep = new RStep(
34+
R_PRIMARY_FILE_OP1,
35+
FILE_DOWNLOAD_PATH)
36+
val returnedDriverContainer = rStep.configureDriver(
37+
KubernetesDriverSpec(
38+
new Pod(),
39+
new Container(),
40+
Seq.empty[HasMetadata],
41+
new SparkConf))
42+
assert(returnedDriverContainer.driverContainer.getEnv
43+
.asScala.map(env => (env.getName, env.getValue)).toMap ===
44+
Map(
45+
"R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP1))
46+
}
47+
48+
test("testing RSpark with remote file") {
49+
val rStep = new RStep(
50+
R_PRIMARY_FILE_OP2,
51+
FILE_DOWNLOAD_PATH)
52+
val returnedDriverContainer = rStep.configureDriver(
53+
KubernetesDriverSpec(
54+
new Pod(),
55+
new Container(),
56+
Seq.empty[HasMetadata],
57+
new SparkConf))
58+
assert(returnedDriverContainer.driverContainer.getEnv
59+
.asScala.map(env => (env.getName, env.getValue)).toMap ===
60+
Map(
61+
"R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP2))
62+
}
63+
64+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
FROM spark-base
19+
20+
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
21+
# command should be invoked from the top level directory of the Spark distribution. E.g.:
22+
# docker build -t spark-driver-r:latest -f dockerfiles/driver-r/Dockerfile .
23+
24+
ADD examples /opt/spark/examples
25+
ADD R /opt/spark/R
26+
27+
RUN apk add --no-cache R R-dev
28+
29+
ENV R_HOME /usr/lib/R
30+
31+
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
32+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
33+
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
34+
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
35+
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
36+
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
37+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
38+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
39+
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
FROM spark-base
19+
20+
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
21+
# command should be invoked from the top level directory of the Spark distribution. E.g.:
22+
# docker build -t spark-executor-r:latest -f dockerfiles/executor-r/Dockerfile .
23+
24+
ADD examples /opt/spark/examples
25+
ADD R /opt/spark/R
26+
27+
RUN apk add --no-cache R R-dev
28+
29+
ENV R_HOME /usr/lib/R
30+
31+
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
32+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
33+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
34+
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
35+
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
36+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
37+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
38+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

0 commit comments

Comments
 (0)