Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 9d6665c

Browse files
mccheahash211
authored andcommitted
Support driver pod kubernetes credentials mounting in V2 submission (#246)
1 parent 6882a1b commit 9d6665c

20 files changed

+632
-181
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
case class KubernetesCredentials(
20+
oauthTokenBase64: Option[String],
21+
caCertDataBase64: Option[String],
22+
clientKeyDataBase64: Option[String],
23+
clientCertDataBase64: Option[String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,30 +120,42 @@ package object config extends Logging {
120120
private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE =
121121
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile")
122122
.doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" +
123-
" against Kubernetes.")
123+
" against Kubernetes. Typically this is configured by spark-submit from mounting a" +
124+
" secret from the submitting machine into the pod, and hence this configuration is marked" +
125+
" as internal, but this can also be set manually to use a certificate that is mounted" +
126+
" into the driver pod via other means.")
124127
.stringConf
125128
.createOptional
126129

127130
private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE =
128131
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile")
129132
.doc("Path on the driver pod's disk containing the client key file to use when" +
130-
" authenticating against Kubernetes.")
133+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
134+
" mounting a secret from the submitting machine into the pod, and hence this" +
135+
" configuration is marked as internal, but this can also be set manually to" +
136+
" use a key file that is mounted into the driver pod via other means.")
131137
.internal()
132138
.stringConf
133139
.createOptional
134140

135141
private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE =
136142
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile")
137143
.doc("Path on the driver pod's disk containing the client cert file to use when" +
138-
" authenticating against Kubernetes.")
144+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
145+
" mounting a secret from the submitting machine into the pod, and hence this" +
146+
" configuration is marked as internal, but this can also be set manually to" +
147+
" use a certificate that is mounted into the driver pod via other means.")
139148
.internal()
140149
.stringConf
141150
.createOptional
142151

143152
private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN =
144153
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile")
145154
.doc("Path on the driver pod's disk containing the OAuth token file to use when" +
146-
" authenticating against Kubernetes.")
155+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
156+
" mounting a secret from the submitting machine into the pod, and hence this" +
157+
" configuration is marked as internal, but this can also be set manually to" +
158+
" use a token that is mounted into the driver pod via other means.")
147159
.internal()
148160
.stringConf
149161
.createOptional

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ package object constants {
3838
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
3939
private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem"
4040
private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem"
41+
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
42+
"/mnt/secrets/spark-kubernetes-credentials"
43+
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
44+
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
45+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
46+
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
47+
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
48+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
49+
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
50+
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
51+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
52+
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
53+
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
54+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
55+
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
56+
4157

4258
// Default and fixed ports
4359
private[spark] val SUBMISSION_SERVER_PORT = 7077
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.v1
17+
package org.apache.spark.deploy.kubernetes.submit
1818

1919
import java.io.File
2020

21+
import com.google.common.base.Charsets
2122
import com.google.common.io.{BaseEncoding, Files}
2223

2324
import org.apache.spark.SparkConf
25+
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
2426
import org.apache.spark.deploy.kubernetes.config._
25-
import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials
2627
import org.apache.spark.internal.config.OptionalConfigEntry
2728

2829
private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {
@@ -38,15 +39,17 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf
3839
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
3940
"Cannot specify both a service account and a driver pod client cert file.")
4041
}
41-
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
42+
val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token =>
43+
BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8))
44+
}
4245
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
4346
s"Driver CA cert file provided at %s does not exist or is not a file.")
4447
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
4548
s"Driver client key file provided at %s does not exist or is not a file.")
4649
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
4750
s"Driver client cert file provided at %s does not exist or is not a file.")
4851
KubernetesCredentials(
49-
oauthToken = oauthToken,
52+
oauthTokenBase64 = oauthTokenBase64,
5053
caCertDataBase64 = caCertDataBase64,
5154
clientKeyDataBase64 = clientKeyDataBase64,
5255
clientCertDataBase64 = clientCertDataBase64)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import org.apache.commons.codec.binary.Base64
3030
import scala.collection.JavaConverters._
3131

3232
import org.apache.spark.{SparkConf, SparkException}
33-
import org.apache.spark.deploy.kubernetes.CompressionUtils
33+
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
3434
import org.apache.spark.deploy.kubernetes.config._
3535
import org.apache.spark.deploy.kubernetes.constants._
36-
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
37-
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
36+
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
37+
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
3838
import org.apache.spark.internal.Logging
3939
import org.apache.spark.util.{ShutdownHookManager, Utils}
4040

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ private[spark] class Client(
4848
sparkJars: Seq[String],
4949
sparkFiles: Seq[String],
5050
kubernetesClientProvider: SubmissionKubernetesClientProvider,
51-
initContainerComponentsProvider: DriverInitContainerComponentsProvider) extends Logging {
51+
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
52+
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
53+
extends Logging {
5254

5355
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
5456
.getOrElse(kubernetesAppId)
@@ -133,18 +135,22 @@ private[spark] class Client(
133135
.provideInitContainerBootstrap()
134136
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)
135137

136-
val driverOwnedResources = Seq(initContainerConfigMap) ++
137-
maybeSubmittedDependenciesSecret.toSeq
138-
139138
val containerLocalizedFilesResolver = initContainerComponentsProvider
140139
.provideContainerLocalizedFilesResolver()
141140
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
142141
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()
143142

144143
val executorInitContainerConfiguration = initContainerComponentsProvider
145144
.provideExecutorInitContainerConfiguration()
146-
val resolvedSparkConf = executorInitContainerConfiguration
145+
val sparkConfWithExecutorInit = executorInitContainerConfiguration
147146
.configureSparkConfForExecutorInitContainer(sparkConf)
147+
val credentialsMounter = kubernetesCredentialsMounterProvider
148+
.getDriverPodKubernetesCredentialsMounter()
149+
val credentialsSecret = credentialsMounter.createCredentialsSecret()
150+
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials(
151+
podWithInitContainer, driverContainer.getName, credentialsSecret)
152+
val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations(
153+
sparkConfWithExecutorInit)
148154
if (resolvedSparkJars.nonEmpty) {
149155
resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
150156
}
@@ -166,7 +172,7 @@ private[spark] class Client(
166172
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
167173
case (confKey, confValue) => s"-D$confKey=$confValue"
168174
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
169-
val resolvedDriverPod = podWithInitContainer.editSpec()
175+
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
170176
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
171177
.addNewEnv()
172178
.withName(ENV_MOUNTED_CLASSPATH)
@@ -181,6 +187,9 @@ private[spark] class Client(
181187
.build()
182188
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
183189
try {
190+
val driverOwnedResources = Seq(initContainerConfigMap) ++
191+
maybeSubmittedDependenciesSecret.toSeq ++
192+
credentialsSecret.toSeq
184193
val driverPodOwnerReference = new OwnerReferenceBuilder()
185194
.withName(createdDriverPod.getMetadata.getName)
186195
.withApiVersion(createdDriverPod.getApiVersion)
@@ -261,6 +270,8 @@ private[spark] object Client {
261270
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
262271
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
263272
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
273+
val kubernetesCredentialsMounterProvider =
274+
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
264275
new Client(
265276
appName,
266277
kubernetesAppId,
@@ -270,6 +281,7 @@ private[spark] object Client {
270281
sparkJars,
271282
sparkFiles,
272283
kubernetesClientProvider,
273-
initContainerComponentsProvider).run()
284+
initContainerComponentsProvider,
285+
kubernetesCredentialsMounterProvider).run()
274286
}
275287
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.v2
18+
19+
import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
24+
import org.apache.spark.deploy.kubernetes.config._
25+
import org.apache.spark.deploy.kubernetes.constants._
26+
import org.apache.spark.internal.config.OptionalConfigEntry
27+
28+
private[spark] trait DriverPodKubernetesCredentialsMounter {
29+
30+
/**
31+
* Set fields on the Spark configuration that indicate where the driver pod is
32+
* to find its Kubernetes credentials for requesting executors.
33+
*/
34+
def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf
35+
36+
/**
37+
* Create the Kubernetes secret object that correspond to the driver's credentials
38+
* that have to be created and mounted into the driver pod. The single Secret
39+
* object contains all of the data entries for the driver pod's Kubernetes
40+
* credentials. Returns empty if no secrets are to be mounted.
41+
*/
42+
def createCredentialsSecret(): Option[Secret]
43+
44+
/**
45+
* Mount any Kubernetes credentials from the submitting machine's disk into the driver pod. The
46+
* secret that is passed in here should have been created from createCredentialsSecret so that
47+
* the implementation does not need to hold its state.
48+
*/
49+
def mountDriverKubernetesCredentials(
50+
originalPodSpec: PodBuilder,
51+
driverContainerName: String,
52+
credentialsSecret: Option[Secret]): PodBuilder
53+
}
54+
55+
private[spark] class DriverPodKubernetesCredentialsMounterImpl(
56+
kubernetesAppId: String,
57+
submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials,
58+
maybeUserSpecifiedMountedClientKeyFile: Option[String],
59+
maybeUserSpecifiedMountedClientCertFile: Option[String],
60+
maybeUserSpecifiedMountedOAuthTokenFile: Option[String],
61+
maybeUserSpecifiedMountedCaCertFile: Option[String])
62+
extends DriverPodKubernetesCredentialsMounter {
63+
64+
override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = {
65+
val resolvedMountedClientKeyFile = resolveSecretLocation(
66+
maybeUserSpecifiedMountedClientKeyFile,
67+
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
68+
DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
69+
val resolvedMountedClientCertFile = resolveSecretLocation(
70+
maybeUserSpecifiedMountedClientCertFile,
71+
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
72+
DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
73+
val resolvedMountedCaCertFile = resolveSecretLocation(
74+
maybeUserSpecifiedMountedCaCertFile,
75+
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
76+
DRIVER_CREDENTIALS_CA_CERT_PATH)
77+
val resolvedMountedOAuthTokenFile = resolveSecretLocation(
78+
maybeUserSpecifiedMountedOAuthTokenFile,
79+
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
80+
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
81+
val sparkConfWithCredentialLocations = sparkConf.clone()
82+
.setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile)
83+
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile)
84+
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile)
85+
.setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile)
86+
sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
87+
sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
88+
}
89+
sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
90+
sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
91+
}
92+
sparkConfWithCredentialLocations
93+
}
94+
95+
override def createCredentialsSecret(): Option[Secret] = {
96+
val allSecretData =
97+
resolveSecretData(
98+
maybeUserSpecifiedMountedClientKeyFile,
99+
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
100+
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
101+
resolveSecretData(
102+
maybeUserSpecifiedMountedClientCertFile,
103+
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
104+
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
105+
resolveSecretData(
106+
maybeUserSpecifiedMountedCaCertFile,
107+
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
108+
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
109+
resolveSecretData(
110+
maybeUserSpecifiedMountedOAuthTokenFile,
111+
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
112+
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
113+
if (allSecretData.isEmpty) {
114+
None
115+
} else {
116+
Some(new SecretBuilder()
117+
.withNewMetadata().withName(s"$kubernetesAppId-kubernetes-credentials").endMetadata()
118+
.withData(allSecretData.asJava)
119+
.build())
120+
}
121+
}
122+
123+
override def mountDriverKubernetesCredentials(
124+
originalPodSpec: PodBuilder,
125+
driverContainerName: String,
126+
credentialsSecret: Option[Secret]): PodBuilder = {
127+
credentialsSecret.map { secret =>
128+
originalPodSpec.editSpec()
129+
.addNewVolume()
130+
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
131+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
132+
.endVolume()
133+
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName))
134+
.addNewVolumeMount()
135+
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
136+
.withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
137+
.endVolumeMount()
138+
.endContainer()
139+
.endSpec()
140+
}.getOrElse(originalPodSpec)
141+
}
142+
143+
private def resolveSecretLocation(
144+
mountedUserSpecified: Option[String],
145+
valueMountedFromSubmitter: Option[String],
146+
mountedCanonicalLocation: String): Option[String] = {
147+
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => {
148+
mountedCanonicalLocation
149+
}))
150+
}
151+
152+
private def resolveSecretData(
153+
mountedUserSpecified: Option[String],
154+
valueMountedFromSubmitter: Option[String],
155+
secretName: String): Map[String, String] = {
156+
mountedUserSpecified.map { _ => Map.empty[String, String]}
157+
.getOrElse {
158+
valueMountedFromSubmitter.map { valueBase64 =>
159+
Map(secretName -> valueBase64)
160+
}.getOrElse(Map.empty[String, String])
161+
}
162+
}
163+
164+
private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
165+
new OptionSettableSparkConf(sparkConf)
166+
}
167+
}
168+
169+
private class OptionSettableSparkConf(sparkConf: SparkConf) {
170+
def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = {
171+
option.map( opt => {
172+
sparkConf.set(configEntry, opt)
173+
}).getOrElse(sparkConf)
174+
}
175+
}

0 commit comments

Comments
 (0)