Skip to content

Commit b84056e

Browse files
Sahil Prasadmccheah
authored andcommitted
Fail submission if submitter-local files are provided without resourc… (apache-spark-on-k8s#447)
* Fail submission if submitter-local files are provided without resource staging server URI * Modified logic to validate only submitted jars; added orchestrator tests * Incorporated feedback * Fix failing test case
1 parent be394c6 commit b84056e

File tree

2 files changed

+82
-2
lines changed

2 files changed

+82
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl
23+
import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl}
2424
import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
2525
import org.apache.spark.util.Utils
2626

@@ -62,6 +62,12 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
6262
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
6363
.orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
6464
.getOrElse(false)
65+
66+
OptionRequirements.requireSecondIfFirstIsDefined(
67+
KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).headOption,
68+
resourceStagingServerUri,
69+
"Local JARs were provided, however no resource staging server URI was found.")
70+
6571
OptionRequirements.requireNandDefined(
6672
maybeResourceStagingServerInternalClientCert,
6773
maybeResourceStagingServerInternalTrustStore,

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,80 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
4444
private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
4545
private val STAGING_SERVER_URI = "http://localhost:8000"
4646

47+
test ("error thrown if local jars provided without resource staging server") {
48+
val sparkConf = new SparkConf(true)
49+
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
50+
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
51+
52+
assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty)
53+
54+
val thrown = intercept[IllegalArgumentException] {
55+
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
56+
NAMESPACE,
57+
APP_RESOURCE_PREFIX,
58+
SPARK_JARS,
59+
SPARK_FILES,
60+
JARS_DOWNLOAD_PATH,
61+
FILES_DOWNLOAD_PATH,
62+
DOCKER_IMAGE_PULL_POLICY,
63+
DRIVER_LABELS,
64+
INIT_CONTAINER_CONFIG_MAP_NAME,
65+
INIT_CONTAINER_CONFIG_MAP_KEY,
66+
sparkConf)
67+
}
68+
69+
assert(thrown.getMessage contains "Local JARs were provided, however no resource staging" +
70+
" server URI was found.")
71+
}
72+
73+
test ("error not thrown with non-local jars and resource staging server provided") {
74+
val sparkConf = new SparkConf(true)
75+
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
76+
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
77+
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI)
78+
79+
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
80+
NAMESPACE,
81+
APP_RESOURCE_PREFIX,
82+
SPARK_JARS.take(1),
83+
SPARK_FILES,
84+
JARS_DOWNLOAD_PATH,
85+
FILES_DOWNLOAD_PATH,
86+
DOCKER_IMAGE_PULL_POLICY,
87+
DRIVER_LABELS,
88+
INIT_CONTAINER_CONFIG_MAP_NAME,
89+
INIT_CONTAINER_CONFIG_MAP_KEY,
90+
sparkConf)
91+
val initSteps : Seq[InitContainerConfigurationStep] =
92+
orchestrator.getAllConfigurationSteps()
93+
assert(initSteps.length == 2)
94+
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
95+
assert(initSteps(1).isInstanceOf[SubmittedResourcesInitContainerConfigurationStep])
96+
}
97+
98+
test ("error not thrown with non-local jars and no resource staging server provided") {
99+
val sparkConf = new SparkConf(true)
100+
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
101+
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
102+
103+
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
104+
NAMESPACE,
105+
APP_RESOURCE_PREFIX,
106+
SPARK_JARS.take(1),
107+
SPARK_FILES,
108+
JARS_DOWNLOAD_PATH,
109+
FILES_DOWNLOAD_PATH,
110+
DOCKER_IMAGE_PULL_POLICY,
111+
DRIVER_LABELS,
112+
INIT_CONTAINER_CONFIG_MAP_NAME,
113+
INIT_CONTAINER_CONFIG_MAP_KEY,
114+
sparkConf)
115+
val initSteps : Seq[InitContainerConfigurationStep] =
116+
orchestrator.getAllConfigurationSteps()
117+
assert(initSteps.length == 1)
118+
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
119+
}
120+
47121
test ("including step to contact resource staging server") {
48122
val sparkConf = new SparkConf(true)
49123
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
@@ -77,7 +151,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
77151
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
78152
NAMESPACE,
79153
APP_RESOURCE_PREFIX,
80-
SPARK_JARS,
154+
SPARK_JARS.take(1),
81155
SPARK_FILES,
82156
JARS_DOWNLOAD_PATH,
83157
FILES_DOWNLOAD_PATH,

0 commit comments

Comments
 (0)