Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 88306b2

Browse files
mccheahash211
authored andcommitted
Allow client certificate PEM for resource staging server. (#257)
1 parent 9d6665c commit 88306b2

File tree

17 files changed

+256
-96
lines changed

17 files changed

+256
-96
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,15 @@ package object config extends Logging {
364364
private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
365365
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
366366
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
367-
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
367+
.doc("Certificate PEM file to use when having the resource staging server" +
368368
" listen on TLS.")
369369
.stringConf
370370
.createOptional
371+
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
372+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
373+
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
374+
.stringConf
375+
.createOptional
371376

372377
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
373378
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
2525
import org.apache.spark.{SparkConf, SparkException}
2626
import org.apache.spark.deploy.kubernetes.config._
2727
import org.apache.spark.deploy.kubernetes.constants._
28+
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.launcher.SparkLauncher
3031
import org.apache.spark.util.Utils
@@ -267,8 +268,9 @@ private[spark] object Client {
267268
val appName = sparkConf.getOption("spark.app.name")
268269
.getOrElse("spark")
269270
val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
271+
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
270272
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
271-
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
273+
sparkConf, kubernetesAppId, sparkJars, sparkFiles, sslOptionsProvider.getSslOptions)
272274
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
273275
val kubernetesCredentialsMounterProvider =
274276
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.v2
1818

19-
import org.apache.spark.{SecurityManager, SparkConf}
19+
import org.apache.spark.{SparkConf, SSLOptions}
2020
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
@@ -46,12 +46,11 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
4646
sparkConf: SparkConf,
4747
kubernetesAppId: String,
4848
sparkJars: Seq[String],
49-
sparkFiles: Seq[String])
49+
sparkFiles: Seq[String],
50+
resourceStagingServerSslOptions: SSLOptions)
5051
extends DriverInitContainerComponentsProvider {
5152

5253
private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
53-
private val resourceStagingServerSslOptions = new SecurityManager(sparkConf)
54-
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
5554
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
5655
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
5756
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder}
2222
import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.SSLOptions
25-
import org.apache.spark.deploy.kubernetes.constants._
2625

2726
private[spark] trait SubmittedDependencySecretBuilder {
2827
/**

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -414,18 +414,20 @@ private[spark] object KubernetesSparkRestServer {
414414
// If keystore password isn't set but we're using PEM files, generate a password
415415
.orElse(parsedArguments.keyPemFile.map(_ => randomPassword()))
416416
val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse(
417-
parsedArguments.keyPemFile.map(keyPemFile => {
418-
parsedArguments.certPemFile.map(certPemFile => {
419-
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
420-
new File(keyPemFile),
421-
new File(certPemFile),
422-
"provided-key",
423-
keyStorePassword,
424-
keyPassword,
425-
parsedArguments.keyStoreType)
426-
})
427-
}).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" +
428-
" submission server, both the key and the certificate must be specified.")))
417+
for {
418+
keyPemFile <- parsedArguments.keyPemFile
419+
certPemFile <- parsedArguments.certPemFile
420+
resolvedKeyStorePassword <- keyStorePassword
421+
resolvedKeyPassword <- keyPassword
422+
} yield {
423+
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
424+
new File(keyPemFile),
425+
new File(certPemFile),
426+
"provided-key",
427+
resolvedKeyStorePassword,
428+
resolvedKeyPassword,
429+
parsedArguments.keyStoreType)
430+
})
429431
new SSLOptions(
430432
enabled = true,
431433
keyStore = resolvedKeyStore,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ private[spark] object PemsToKeyStoreConverter {
4343
keyPemFile: File,
4444
certPemFile: File,
4545
keyAlias: String,
46-
keyStorePassword: Option[String],
47-
keyPassword: Option[String],
46+
keyStorePassword: String,
47+
keyPassword: String,
4848
keyStoreType: Option[String]): File = {
4949
require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" +
5050
" does not exist or is not a file.")
@@ -58,12 +58,12 @@ private[spark] object PemsToKeyStoreConverter {
5858
keyStore.setKeyEntry(
5959
keyAlias,
6060
privateKey,
61-
keyPassword.map(_.toCharArray).orNull,
61+
keyPassword.toCharArray,
6262
certificates)
6363
val keyStoreDir = Utils.createTempDir("temp-keystores")
6464
val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
6565
Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream =>
66-
keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull)
66+
keyStore.store(storeStream, keyStorePassword.toCharArray)
6767
}
6868
keyStoreFile
6969
}
@@ -81,6 +81,20 @@ private[spark] object PemsToKeyStoreConverter {
8181
trustStore
8282
}
8383

84+
def convertCertPemToTempTrustStoreFile(
85+
certPemFile: File,
86+
trustStorePassword: String,
87+
trustStoreType: Option[String]): File = {
88+
val trustStore = convertCertPemToTrustStore(certPemFile, trustStoreType)
89+
val tempTrustStoreDir = Utils.createTempDir(namePrefix = "temp-trustStore")
90+
val tempTrustStoreFile = new File(tempTrustStoreDir,
91+
s"trustStore.${trustStoreType.getOrElse(KeyStore.getDefaultType)}")
92+
Utils.tryWithResource(new FileOutputStream(tempTrustStoreFile)) {
93+
trustStore.store(_, trustStorePassword.toCharArray)
94+
}
95+
tempTrustStoreFile
96+
}
97+
8498
private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = {
8599
Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream =>
86100
Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import retrofit2.{Call, Callback, Response}
2828
import scala.concurrent.{ExecutionContext, Future}
2929
import scala.concurrent.duration.Duration
3030

31-
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
31+
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SSLOptions}
3232
import org.apache.spark.deploy.SparkHadoopUtil
3333
import org.apache.spark.deploy.kubernetes.config._
3434
import org.apache.spark.deploy.kubernetes.CompressionUtils
@@ -95,7 +95,7 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
9595
sparkConf: SparkConf,
9696
retrofitClientFactory: RetrofitClientFactory,
9797
fileFetcher: FileFetcher,
98-
securityManager: SparkSecurityManager) extends Logging {
98+
resourceStagingServerSslOptions: SSLOptions) extends Logging {
9999

100100
private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
101101
ThreadUtils.newDaemonCachedThreadPool("download-executor"))
@@ -177,9 +177,10 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
177177
maybeResourceId.foreach { resourceId =>
178178
require(resourceSecretLocation.isFile, errMessageOnSecretNotAFile)
179179
require(resourceDownloadDir.isDirectory, errMessageOnDownloadDirNotADirectory)
180-
val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer")
181180
val service = retrofitClientFactory.createRetrofitClient(
182-
resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions)
181+
resourceStagingServerUri,
182+
classOf[ResourceStagingServiceRetrofit],
183+
resourceStagingServerSslOptions)
183184
val resourceSecret = Files.toString(resourceSecretLocation, Charsets.UTF_8)
184185
val downloadResourceCallback = new DownloadTarGzCallback(resourceDownloadDir)
185186
logInfo(downloadStartMessage)
@@ -219,12 +220,14 @@ object KubernetesSparkDependencyDownloadInitContainer extends Logging {
219220
new SparkConf(true)
220221
}
221222
val securityManager = new SparkSecurityManager(sparkConf)
223+
val resourceStagingServerSslOptions =
224+
new ResourceStagingServerSslOptionsProviderImpl(sparkConf).getSslOptions
222225
val fileFetcher = new FileFetcherImpl(sparkConf, securityManager)
223226
new KubernetesSparkDependencyDownloadInitContainer(
224227
sparkConf,
225228
RetrofitClientFactoryImpl,
226229
fileFetcher,
227-
securityManager).run()
230+
resourceStagingServerSslOptions).run()
228231
logInfo("Finished downloading application dependencies.")
229232
}
230233
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package org.apache.spark.deploy.rest.kubernetes.v2
1818

1919
import java.io.File
20+
import java.security.SecureRandom
2021

2122
import com.google.common.base.Charsets
2223
import com.google.common.io.Files
24+
import org.apache.commons.lang3.RandomStringUtils
2325

2426
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
2527
import org.apache.spark.deploy.kubernetes.config._
@@ -32,63 +34,98 @@ private[spark] trait ResourceStagingServerSslOptionsProvider {
3234

3335
private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf)
3436
extends ResourceStagingServerSslOptionsProvider with Logging {
37+
38+
private val SECURE_RANDOM = new SecureRandom()
39+
3540
def getSslOptions: SSLOptions = {
3641
val baseSslOptions = new SparkSecurityManager(sparkConf)
37-
.getSSLOptions("kubernetes.resourceStagingServer")
42+
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
3843
val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM)
39-
val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
44+
val maybeServerCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
4045
val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE)
4146
val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE)
47+
val maybeClientCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM)
4248

4349
logSslConfigurations(
44-
baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile)
50+
baseSslOptions,
51+
maybeKeyPem,
52+
maybeServerCertPem,
53+
maybeKeyStorePasswordFile,
54+
maybeKeyPasswordFile,
55+
maybeClientCertPem)
4556

4657
requireNandDefined(baseSslOptions.keyStore, maybeKeyPem,
4758
"Shouldn't provide both key PEM and keyStore files for TLS.")
48-
requireNandDefined(baseSslOptions.keyStore, maybeCertPem,
59+
requireNandDefined(baseSslOptions.keyStore, maybeServerCertPem,
4960
"Shouldn't provide both certificate PEM and keyStore files for TLS.")
5061
requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile,
5162
"Shouldn't provide both the keyStore password value and the keyStore password file.")
5263
requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile,
5364
"Shouldn't provide both the keyStore key password value and the keyStore key password file.")
5465
requireBothOrNeitherDefined(
5566
maybeKeyPem,
56-
maybeCertPem,
67+
maybeServerCertPem,
5768
"When providing a certificate PEM file, the key PEM file must also be provided.",
5869
"When providing a key PEM file, the certificate PEM file must also be provided.")
70+
requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem,
71+
"Shouldn't provide both the trustStore and a client certificate PEM file.")
5972

6073
val resolvedKeyStorePassword = baseSslOptions.keyStorePassword
6174
.orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile =>
6275
safeFileToString(keyStorePasswordFile, "KeyStore password file")
6376
})
77+
.orElse(maybeKeyPem.map { _ => randomPassword()})
6478
val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword
6579
.orElse(maybeKeyPasswordFile.map { keyPasswordFile =>
6680
safeFileToString(keyPasswordFile, "KeyStore key password file")
6781
})
68-
val resolvedKeyStore = baseSslOptions.keyStore
69-
.orElse(maybeKeyPem.map { keyPem =>
82+
.orElse(maybeKeyPem.map { _ => randomPassword()})
83+
val resolvedKeyStore = baseSslOptions.keyStore.orElse {
84+
for {
85+
keyPem <- maybeKeyPem
86+
certPem <- maybeServerCertPem
87+
keyStorePassword <- resolvedKeyStorePassword
88+
keyPassword <- resolvedKeyStoreKeyPassword
89+
} yield {
7090
val keyPemFile = new File(keyPem)
71-
val certPemFile = new File(maybeCertPem.get)
91+
val certPemFile = new File(certPem)
7292
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
7393
keyPemFile,
7494
certPemFile,
7595
"key",
76-
resolvedKeyStorePassword,
77-
resolvedKeyStoreKeyPassword,
96+
keyStorePassword,
97+
keyPassword,
7898
baseSslOptions.keyStoreType)
79-
})
99+
}
100+
}
101+
val resolvedTrustStorePassword = baseSslOptions.trustStorePassword
102+
.orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword"))
103+
val resolvedTrustStore = baseSslOptions.trustStore.orElse {
104+
for {
105+
clientCertPem <- maybeClientCertPem
106+
trustStorePassword <- resolvedTrustStorePassword
107+
} yield {
108+
val certPemFile = new File(clientCertPem)
109+
PemsToKeyStoreConverter.convertCertPemToTempTrustStoreFile(
110+
certPemFile,
111+
trustStorePassword,
112+
baseSslOptions.trustStoreType)
113+
}
114+
}
80115
baseSslOptions.copy(
81116
keyStore = resolvedKeyStore,
82117
keyStorePassword = resolvedKeyStorePassword,
83-
keyPassword = resolvedKeyStoreKeyPassword)
118+
keyPassword = resolvedKeyStoreKeyPassword,
119+
trustStore = resolvedTrustStore)
84120
}
85121

86122
private def logSslConfigurations(
87123
baseSslOptions: SSLOptions,
88124
maybeKeyPem: Option[String],
89-
maybeCertPem: Option[String],
125+
maybeServerCertPem: Option[String],
90126
maybeKeyStorePasswordFile: Option[String],
91-
maybeKeyPasswordFile: Option[String]) = {
127+
maybeKeyPasswordFile: Option[String],
128+
maybeClientCertPem: Option[String]) = {
92129
logDebug("The following SSL configurations were provided for the resource staging server:")
93130
logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}")
94131
logDebug("KeyStore Password: " +
@@ -99,7 +136,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
99136
logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}")
100137
logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}")
101138
logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}")
102-
logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}")
139+
logDebug(s"Server-side certificate PEM: ${maybeServerCertPem.getOrElse("N/A")}")
140+
logDebug(s"Client-side certificate PEM: ${maybeClientCertPem.getOrElse("N/A")}")
103141
}
104142

105143
private def requireBothOrNeitherDefined(
@@ -130,4 +168,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
130168
}
131169
Files.toString(file, Charsets.UTF_8)
132170
}
171+
172+
private def randomPassword(): String = {
173+
RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM)
174+
}
133175
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3Certi
3030
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
3131
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
3232

33+
import org.apache.spark.deploy.kubernetes.submit.v2.{KeyAndCertPem, KeyStoreAndTrustStore}
3334
import org.apache.spark.util.Utils
3435

3536
private[spark] object SSLUtils {
@@ -38,7 +39,7 @@ private[spark] object SSLUtils {
3839
ipAddress: String,
3940
keyStorePassword: String,
4041
keyPassword: String,
41-
trustStorePassword: String): (File, File) = {
42+
trustStorePassword: String): KeyStoreAndTrustStore = {
4243
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
4344
keyPairGenerator.initialize(512)
4445
val keyPair = keyPairGenerator.generateKeyPair()
@@ -60,10 +61,10 @@ private[spark] object SSLUtils {
6061
Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
6162
trustStore.store(_, trustStorePassword.toCharArray)
6263
}
63-
(keyStoreFile, trustStoreFile)
64+
KeyStoreAndTrustStore(keyStoreFile, trustStoreFile)
6465
}
6566

66-
def generateKeyCertPemPair(ipAddress: String): (File, File) = {
67+
def generateKeyCertPemPair(ipAddress: String): KeyAndCertPem = {
6768
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
6869
keyPairGenerator.initialize(512)
6970
val keyPair = keyPairGenerator.generateKeyPair()
@@ -90,7 +91,7 @@ private[spark] object SSLUtils {
9091
}
9192
}
9293
}
93-
(keyPemFile, certPemFile)
94+
KeyAndCertPem(keyPemFile, certPemFile)
9495
}
9596

9697
private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = {

0 commit comments

Comments
 (0)