Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 8f6f0a0

Browse files
mccheahash211
authored andcommitted
Differentiate between URI and SSL settings for in-cluster vs. submission (#281)
1 parent 88306b2 commit 8f6f0a0

File tree

10 files changed

+341
-130
lines changed

10 files changed

+341
-130
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
private[spark] object OptionRequirements {
20+
21+
def requireBothOrNeitherDefined(
22+
opt1: Option[_],
23+
opt2: Option[_],
24+
errMessageWhenFirstIsMissing: String,
25+
errMessageWhenSecondIsMissing: String): Unit = {
26+
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
27+
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
28+
}
29+
30+
def requireSecondIfFirstIsDefined(
31+
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
32+
opt1.foreach { _ =>
33+
require(opt2.isDefined, errMessageWhenSecondIsMissing)
34+
}
35+
}
36+
37+
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
38+
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
39+
}
40+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,8 @@ package object config extends Logging {
362362
.createOptional
363363

364364
private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
365+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE =
366+
"kubernetes.resourceStagingServer.internal"
365367
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
366368
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
367369
.doc("Certificate PEM file to use when having the resource staging server" +
@@ -370,47 +372,98 @@ package object config extends Logging {
370372
.createOptional
371373
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
372374
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
373-
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
375+
.doc("Certificate PEM file to use when the client contacts the resource staging server." +
376+
" This must strictly be a path to a file on the submitting machine's disk.")
377+
.stringConf
378+
.createOptional
379+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM =
380+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.clientCertPem")
381+
.doc("Certificate PEM file to use when the init-container contacts the resource staging" +
382+
" server. If this is not provided, it defaults to the value of" +
383+
" spark.ssl.kubernetes.resourceStagingServer.clientCertPem. This can be a URI with" +
384+
" a scheme of local:// which denotes that the file is pre-mounted on the init-container's" +
385+
" disk. A uri without a scheme or a scheme of file:// will result in this file being" +
386+
" mounted from the submitting machine's disk as a secret into the pods.")
374387
.stringConf
375388
.createOptional
376-
377389
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
378390
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
379-
.doc("File containing the keystore password for the Kubernetes dependency server.")
391+
.doc("File containing the keystore password for the Kubernetes resource staging server.")
380392
.stringConf
381393
.createOptional
382394

383395
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
384396
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
385-
.doc("File containing the key password for the Kubernetes dependency server.")
397+
.doc("File containing the key password for the Kubernetes resource staging server.")
386398
.stringConf
387399
.createOptional
388400

389401
private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
390402
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
391-
.doc("Whether or not to use SSL when communicating with the dependency server.")
403+
.doc("Whether or not to use SSL when communicating with the resource staging server.")
404+
.booleanConf
405+
.createOptional
406+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED =
407+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.enabled")
408+
.doc("Whether or not to use SSL when communicating with the resource staging server from" +
409+
" the init-container. If this is not provided, defaults to" +
410+
" the value of spark.ssl.kubernetes.resourceStagingServer.enabled")
392411
.booleanConf
393412
.createOptional
394413
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
395414
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
396-
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
415+
.doc("File containing the trustStore to communicate with the Kubernetes dependency server." +
416+
" This must strictly be a path on the submitting machine's disk.")
417+
.stringConf
418+
.createOptional
419+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE =
420+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStore")
421+
.doc("File containing the trustStore to communicate with the Kubernetes dependency server" +
422+
" from the init-container. If this is not provided, defaults to the value of" +
423+
" spark.ssl.kubernetes.resourceStagingServer.trustStore. This can be a URI with a scheme" +
424+
" of local:// indicating that the trustStore is pre-mounted on the init-container's" +
425+
" disk. If no scheme, or a scheme of file:// is provided, this file is mounted from the" +
426+
" submitting machine's disk as a Kubernetes secret into the pods.")
397427
.stringConf
398428
.createOptional
399429
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
400430
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
401-
.doc("Password for the trustStore for talking to the dependency server.")
431+
.doc("Password for the trustStore for communicating to the dependency server.")
432+
.stringConf
433+
.createOptional
434+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD =
435+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStorePassword")
436+
.doc("Password for the trustStore for communicating to the dependency server from the" +
437+
" init-container. If this is not provided, defaults to" +
438+
" spark.ssl.kubernetes.resourceStagingServer.trustStorePassword.")
402439
.stringConf
403440
.createOptional
404441
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
405442
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
406443
.doc("Type of trustStore for communicating with the dependency server.")
407444
.stringConf
408445
.createOptional
446+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE =
447+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStoreType")
448+
.doc("Type of trustStore for communicating with the dependency server from the" +
449+
" init-container. If this is not provided, defaults to" +
450+
" spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
451+
.stringConf
452+
.createOptional
409453

410454
// Driver and Init-Container parameters for submission v2
411455
private[spark] val RESOURCE_STAGING_SERVER_URI =
412456
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
413-
.doc("Base URI for the Spark resource staging server")
457+
.doc("Base URI for the Spark resource staging server.")
458+
.stringConf
459+
.createOptional
460+
461+
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_URI =
462+
ConfigBuilder("spark.kubernetes.resourceStagingServer.internal.uri")
463+
.doc("Base URI for the Spark resource staging server when the init-containers access it for" +
464+
" downloading resources. If this is not provided, it defaults to the value provided in" +
465+
" spark.kubernetes.resourceStagingServer.uri, the URI that the submission client uses to" +
466+
" upload the resources from outside the cluster.")
414467
.stringConf
415468
.createOptional
416469

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ package object constants {
115115
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY =
116116
"downloadSubmittedFilesSecret"
117117
private[spark] val INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY = "trustStore"
118+
private[spark] val INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY = "ssl-certificate"
118119
private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "download-submitted-files"
119120
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
120121
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
package org.apache.spark.deploy.kubernetes.submit.v2
1818

1919
import org.apache.spark.{SparkConf, SSLOptions}
20-
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
20+
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
2323
import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl
24+
import org.apache.spark.util.Utils
2425

2526
/**
2627
* Interface that wraps the provision of everything the submission client needs to set up the
@@ -47,10 +48,51 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
4748
kubernetesAppId: String,
4849
sparkJars: Seq[String],
4950
sparkFiles: Seq[String],
50-
resourceStagingServerSslOptions: SSLOptions)
51+
resourceStagingServerExternalSslOptions: SSLOptions)
5152
extends DriverInitContainerComponentsProvider {
5253

5354
private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
55+
private val maybeResourceStagingServerInternalUri =
56+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
57+
private val maybeResourceStagingServerInternalTrustStore =
58+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE)
59+
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE))
60+
private val maybeResourceStagingServerInternalTrustStorePassword =
61+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD)
62+
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD))
63+
private val maybeResourceStagingServerInternalTrustStoreType =
64+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE)
65+
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE))
66+
private val maybeResourceStagingServerInternalClientCert =
67+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM)
68+
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM))
69+
private val resourceStagingServerInternalSslEnabled =
70+
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
71+
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
72+
.getOrElse(false)
73+
74+
OptionRequirements.requireNandDefined(
75+
maybeResourceStagingServerInternalClientCert,
76+
maybeResourceStagingServerInternalTrustStore,
77+
"Cannot provide both a certificate file and a trustStore file for init-containers to" +
78+
" use for contacting the resource staging server over TLS.")
79+
80+
require(maybeResourceStagingServerInternalTrustStore.forall { trustStore =>
81+
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
82+
case "file" | "local" => true
83+
case _ => false
84+
}
85+
}, "TrustStore URI used for contacting the resource staging server from init containers must" +
86+
" have no scheme, or scheme file://, or scheme local://.")
87+
88+
require(maybeResourceStagingServerInternalClientCert.forall { trustStore =>
89+
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
90+
case "file" | "local" => true
91+
case _ => false
92+
}
93+
}, "Client cert file URI used for contacting the resource staging server from init containers" +
94+
" must have no scheme, or scheme file://, or scheme local://.")
95+
5496
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
5597
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
5698
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
@@ -71,14 +113,20 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
71113
filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId)
72114
} yield {
73115
new SubmittedDependencyInitContainerConfigPluginImpl(
74-
stagingServerUri,
116+
// Configure the init-container with the internal URI over the external URI.
117+
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
75118
jarsResourceId,
76119
filesResourceId,
77120
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
78121
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
79122
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
80-
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
81-
resourceStagingServerSslOptions)
123+
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
124+
resourceStagingServerInternalSslEnabled,
125+
maybeResourceStagingServerInternalTrustStore,
126+
maybeResourceStagingServerInternalClientCert,
127+
maybeResourceStagingServerInternalTrustStorePassword,
128+
maybeResourceStagingServerInternalTrustStoreType,
129+
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
82130
}
83131
new SparkInitContainerConfigMapBuilderImpl(
84132
sparkJars,
@@ -113,7 +161,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
113161
stagingServerUri,
114162
sparkJars,
115163
sparkFiles,
116-
resourceStagingServerSslOptions,
164+
resourceStagingServerExternalSslOptions,
117165
RetrofitClientFactoryImpl)
118166
}
119167
}
@@ -133,7 +181,9 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
133181
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
134182
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
135183
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
136-
resourceStagingServerSslOptions)
184+
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
185+
maybeResourceStagingServerInternalTrustStore,
186+
maybeResourceStagingServerInternalClientCert)
137187
}
138188
}
139189

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPlugin.scala

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.v2
1818

19-
import org.apache.spark.SSLOptions
19+
import org.apache.spark.SparkException
2020
import org.apache.spark.deploy.kubernetes.config._
21-
import org.apache.spark.deploy.kubernetes.constants._
21+
import org.apache.spark.internal.config.OptionalConfigEntry
22+
import org.apache.spark.util.Utils
2223

2324
private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
2425
/**
@@ -34,36 +35,62 @@ private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
3435
}
3536

3637
private[spark] class SubmittedDependencyInitContainerConfigPluginImpl(
37-
resourceStagingServerUri: String,
38+
internalResourceStagingServerUri: String,
3839
jarsResourceId: String,
3940
filesResourceId: String,
4041
jarsSecretKey: String,
4142
filesSecretKey: String,
4243
trustStoreSecretKey: String,
43-
secretsVolumeMountPath: String,
44-
resourceStagingServiceSslOptions: SSLOptions)
44+
clientCertSecretKey: String,
45+
resourceStagingServerSslEnabled: Boolean,
46+
maybeInternalTrustStoreUri: Option[String],
47+
maybeInternalClientCertUri: Option[String],
48+
maybeInternalTrustStorePassword: Option[String],
49+
maybeInternalTrustStoreType: Option[String],
50+
secretsVolumeMountPath: String)
4551
extends SubmittedDependencyInitContainerConfigPlugin {
4652

4753
override def configurationsToFetchSubmittedDependencies(): Map[String, String] = {
4854
Map[String, String](
49-
RESOURCE_STAGING_SERVER_URI.key -> resourceStagingServerUri,
55+
RESOURCE_STAGING_SERVER_URI.key -> internalResourceStagingServerUri,
5056
INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId,
5157
INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key ->
5258
s"$secretsVolumeMountPath/$jarsSecretKey",
5359
INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId,
5460
INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key ->
5561
s"$secretsVolumeMountPath/$filesSecretKey",
56-
RESOURCE_STAGING_SERVER_SSL_ENABLED.key ->
57-
resourceStagingServiceSslOptions.enabled.toString) ++
58-
resourceStagingServiceSslOptions.trustStore.map { _ =>
59-
(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key,
60-
s"$secretsVolumeMountPath/$trustStoreSecretKey")
61-
}.toMap ++
62-
resourceStagingServiceSslOptions.trustStorePassword.map { password =>
62+
RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> resourceStagingServerSslEnabled.toString) ++
63+
resolveSecretPath(
64+
maybeInternalTrustStoreUri,
65+
trustStoreSecretKey,
66+
RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE,
67+
"TrustStore URI") ++
68+
resolveSecretPath(
69+
maybeInternalClientCertUri,
70+
clientCertSecretKey,
71+
RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM,
72+
"Client certificate URI") ++
73+
maybeInternalTrustStorePassword.map { password =>
6374
(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password)
6475
}.toMap ++
65-
resourceStagingServiceSslOptions.trustStoreType.map { storeType =>
76+
maybeInternalTrustStoreType.map { storeType =>
6677
(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType)
6778
}.toMap
6879
}
80+
81+
private def resolveSecretPath(
82+
maybeUri: Option[String],
83+
secretKey: String,
84+
configEntry: OptionalConfigEntry[String],
85+
uriType: String): Map[String, String] = {
86+
maybeUri.map(Utils.resolveURI).map { uri =>
87+
val resolvedPath = Option(uri.getScheme).getOrElse("file") match {
88+
case "file" => s"$secretsVolumeMountPath/$secretKey"
89+
case "local" => uri.getPath
90+
case invalid => throw new SparkException(s"$uriType has invalid scheme $invalid must be" +
91+
s" local://, file://, or empty.")
92+
}
93+
(configEntry.key, resolvedPath)
94+
}.toMap
95+
}
6996
}

0 commit comments

Comments
 (0)