17
17
package org .apache .spark .deploy .kubernetes .submit
18
18
19
19
import java .io .File
20
- import java .util .Collections
20
+ import java .util .{ Collections , UUID }
21
21
22
22
import io .fabric8 .kubernetes .api .model .{ContainerBuilder , EnvVarBuilder , OwnerReferenceBuilder , PodBuilder , QuantityBuilder }
23
23
import io .fabric8 .kubernetes .client .KubernetesClient
24
24
import scala .collection .JavaConverters ._
25
25
26
- import org .apache .spark .{ SparkConf , SparkException }
26
+ import org .apache .spark .SparkConf
27
27
import org .apache .spark .deploy .kubernetes .{ConfigurationUtils , SparkKubernetesClientFactory }
28
28
import org .apache .spark .deploy .kubernetes .config ._
29
29
import org .apache .spark .deploy .kubernetes .constants ._
@@ -43,22 +43,21 @@ import org.apache.spark.util.Utils
43
43
* where different steps of submission should be factored out into separate classes.
44
44
*/
45
45
private [spark] class Client (
46
- appName : String ,
47
- kubernetesAppId : String ,
48
- mainClass : String ,
49
- sparkConf : SparkConf ,
50
- appArgs : Array [String ],
51
- sparkJars : Seq [String ],
52
- sparkFiles : Seq [String ],
53
- waitForAppCompletion : Boolean ,
54
- kubernetesClient : KubernetesClient ,
55
- initContainerComponentsProvider : DriverInitContainerComponentsProvider ,
56
- kubernetesCredentialsMounterProvider : DriverPodKubernetesCredentialsMounterProvider ,
57
- loggingPodStatusWatcher : LoggingPodStatusWatcher )
58
- extends Logging {
59
-
46
+ appName : String ,
47
+ kubernetesResourceNamePrefix : String ,
48
+ kubernetesAppId : String ,
49
+ mainClass : String ,
50
+ sparkConf : SparkConf ,
51
+ appArgs : Array [String ],
52
+ sparkJars : Seq [String ],
53
+ sparkFiles : Seq [String ],
54
+ waitForAppCompletion : Boolean ,
55
+ kubernetesClient : KubernetesClient ,
56
+ initContainerComponentsProvider : DriverInitContainerComponentsProvider ,
57
+ kubernetesCredentialsMounterProvider : DriverPodKubernetesCredentialsMounterProvider ,
58
+ loggingPodStatusWatcher : LoggingPodStatusWatcher ) extends Logging {
60
59
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME )
61
- .getOrElse(kubernetesAppId )
60
+ .getOrElse(s " $kubernetesResourceNamePrefix -driver " )
62
61
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE )
63
62
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY )
64
63
@@ -86,15 +85,16 @@ private[spark] class Client(
86
85
val parsedCustomLabels = ConfigurationUtils .parseKeyValuePairs(
87
86
customLabels, KUBERNETES_DRIVER_LABELS .key, " labels" )
88
87
require(! parsedCustomLabels.contains(SPARK_APP_ID_LABEL ), s " Label with key " +
89
- s " $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations. " )
90
- require(! parsedCustomLabels.contains(SPARK_APP_NAME_LABEL ), s " Label with key " +
91
- s " $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations. " )
88
+ s " $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
89
+ s " operations. " )
90
+ val parsedCustomAnnotations = ConfigurationUtils .parseKeyValuePairs(
91
+ customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS .key, " annotations" )
92
+ require(! parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION ), s " Annotation with key " +
93
+ s " $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping " +
94
+ s " operations. " )
92
95
val allLabels = parsedCustomLabels ++ Map (
93
96
SPARK_APP_ID_LABEL -> kubernetesAppId,
94
- SPARK_APP_NAME_LABEL -> appName,
95
- SPARK_ROLE_LABEL -> " driver" )
96
- val parsedCustomAnnotations = ConfigurationUtils .parseKeyValuePairs(
97
- customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS .key, " annotations" )
97
+ SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE )
98
98
99
99
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
100
100
new EnvVarBuilder ()
@@ -140,6 +140,7 @@ private[spark] class Client(
140
140
.withName(kubernetesDriverPodName)
141
141
.addToLabels(allLabels.asJava)
142
142
.addToAnnotations(parsedCustomAnnotations.asJava)
143
+ .addToAnnotations(SPARK_APP_NAME_ANNOTATION , appName)
143
144
.endMetadata()
144
145
.withNewSpec()
145
146
.withRestartPolicy(" Never" )
@@ -186,6 +187,7 @@ private[spark] class Client(
186
187
}
187
188
resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME , kubernetesDriverPodName)
188
189
resolvedSparkConf.set(" spark.app.id" , kubernetesAppId)
190
+ resolvedSparkConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX , kubernetesResourceNamePrefix)
189
191
// We don't need this anymore since we just set the JVM options on the environment
190
192
resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS )
191
193
val resolvedLocalClasspath = containerLocalizedFilesResolver
@@ -234,11 +236,11 @@ private[spark] class Client(
234
236
throw e
235
237
}
236
238
if (waitForAppCompletion) {
237
- logInfo(s " Waiting for application $kubernetesAppId to finish... " )
239
+ logInfo(s " Waiting for application $appName to finish... " )
238
240
loggingPodStatusWatcher.awaitCompletion()
239
- logInfo(s " Application $kubernetesAppId finished. " )
241
+ logInfo(s " Application $appName finished. " )
240
242
} else {
241
- logInfo(s " Deployed Spark application $kubernetesAppId into Kubernetes. " )
243
+ logInfo(s " Deployed Spark application $appName into Kubernetes. " )
242
244
}
243
245
}
244
246
}
@@ -279,15 +281,21 @@ private[spark] object Client {
279
281
val sparkFiles = sparkConf.getOption(" spark.files" )
280
282
.map(_.split(" ," ))
281
283
.getOrElse(Array .empty[String ])
282
- val appName = sparkConf.getOption(" spark.app.name" )
283
- .getOrElse(" spark" )
284
- val kubernetesAppId = s " $appName- $launchTime" .toLowerCase.replaceAll(" \\ ." , " -" )
284
+ val appName = sparkConf.getOption(" spark.app.name" ).getOrElse(" spark" )
285
+ // The resource name prefix is derived from the application name, making it easy to connect the
286
+ // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
287
+ // application the user submitted. However, we can't use the application name in the label, as
288
+ // label values are considerably restrictive, e.g. must be no longer than 63 characters in
289
+ // length. So we generate a separate identifier for the app ID itself, and bookkeeping that
290
+ // requires finding "all pods for this application" should use the kubernetesAppId.
291
+ val kubernetesResourceNamePrefix = s " $appName- $launchTime" .toLowerCase.replaceAll(" \\ ." , " -" )
292
+ val kubernetesAppId = s " spark- ${UUID .randomUUID().toString.replaceAll(" -" , " " )}"
285
293
val namespace = sparkConf.get(KUBERNETES_NAMESPACE )
286
294
val master = resolveK8sMaster(sparkConf.get(" spark.master" ))
287
295
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl (sparkConf)
288
296
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl (
289
297
sparkConf,
290
- kubernetesAppId ,
298
+ kubernetesResourceNamePrefix ,
291
299
namespace,
292
300
sparkJars,
293
301
sparkFiles,
@@ -300,14 +308,16 @@ private[spark] object Client {
300
308
None ,
301
309
None )) { kubernetesClient =>
302
310
val kubernetesCredentialsMounterProvider =
303
- new DriverPodKubernetesCredentialsMounterProviderImpl (sparkConf, kubernetesAppId)
311
+ new DriverPodKubernetesCredentialsMounterProviderImpl (
312
+ sparkConf, kubernetesResourceNamePrefix)
304
313
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION )
305
314
val loggingInterval = Option (sparkConf.get(REPORT_INTERVAL ))
306
315
.filter( _ => waitForAppCompletion)
307
316
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl (
308
- kubernetesAppId , loggingInterval)
317
+ kubernetesResourceNamePrefix , loggingInterval)
309
318
new Client (
310
319
appName,
320
+ kubernetesResourceNamePrefix,
311
321
kubernetesAppId,
312
322
mainClass,
313
323
sparkConf,
0 commit comments