Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit f005268

Browse files
mccheahash211
authored andcommitted
Download remotely-located resources on driver and executor startup via init-container (#251)
* Download remotely-located resources on driver startup. Use init-container in executors. * FIx owner reference slightly * Clean up config * Don't rely too heavily on conventions that can change * Fix flaky test * Tidy up file resolver * Whitespace arrangement * Indentation change * Fix more indentation * Consolidate init container component providers * Minor method signature and comment changes * Rename class for consistency * Resolve conflicts * Fix flaky test * Add some tests and some refactoring. * Make naming consistent for Staged -> Submitted * Add unit test for the submission client. * Refine expectations * Rename variables and fix typos * Address more comments. Remove redundant SingleKeyConfigMap. * Minor test adjustments. * add another test * Fix conflicts.
1 parent e9da549 commit f005268

File tree

46 files changed

+2620
-1233
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2620
-1233
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
private[spark] trait InitContainerResourceStagingServerSecretPlugin {
24+
25+
/**
26+
* Configure the init-container to mount the secret files that allow it to retrieve dependencies
27+
* from a resource staging server.
28+
*/
29+
def mountResourceStagingServerSecretIntoInitContainer(
30+
initContainer: ContainerBuilder): ContainerBuilder
31+
32+
/**
33+
* Configure the pod to attach a Secret volume which hosts secret files allowing the
34+
* init-container to retrieve dependencies from the resource staging server.
35+
*/
36+
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
37+
}
38+
39+
private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
40+
initContainerSecretName: String,
41+
initContainerSecretMountPath: String)
42+
extends InitContainerResourceStagingServerSecretPlugin {
43+
44+
override def mountResourceStagingServerSecretIntoInitContainer(
45+
initContainer: ContainerBuilder): ContainerBuilder = {
46+
initContainer.addNewVolumeMount()
47+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
48+
.withMountPath(initContainerSecretMountPath)
49+
.endVolumeMount()
50+
}
51+
52+
override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
53+
basePod.editSpec()
54+
.addNewVolume()
55+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
56+
.withNewSecret()
57+
.withSecretName(initContainerSecretName)
58+
.endSecret()
59+
.endVolume()
60+
.endSpec()
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}
23+
24+
private[spark] trait SparkPodInitContainerBootstrap {
25+
/**
26+
* Bootstraps an init-container that downloads dependencies to be used by a main container.
27+
* Note that this primarily assumes that the init-container's configuration is being provided
28+
* by a ConfigMap that was installed by some other component; that is, the implementation
29+
* here makes no assumptions about how the init-container is specifically configured. For
30+
* example, this class is unaware if the init-container is fetching remote dependencies or if
31+
* it is fetching dependencies from a resource staging server.
32+
*/
33+
def bootstrapInitContainerAndVolumes(
34+
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
35+
}
36+
37+
private[spark] class SparkPodInitContainerBootstrapImpl(
38+
initContainerImage: String,
39+
jarsDownloadPath: String,
40+
filesDownloadPath: String,
41+
downloadTimeoutMinutes: Long,
42+
initContainerConfigMapName: String,
43+
initContainerConfigMapKey: String,
44+
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
45+
extends SparkPodInitContainerBootstrap {
46+
47+
override def bootstrapInitContainerAndVolumes(
48+
mainContainerName: String,
49+
originalPodSpec: PodBuilder): PodBuilder = {
50+
val sharedVolumeMounts = Seq[VolumeMount](
51+
new VolumeMountBuilder()
52+
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
53+
.withMountPath(jarsDownloadPath)
54+
.build(),
55+
new VolumeMountBuilder()
56+
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
57+
.withMountPath(filesDownloadPath)
58+
.build())
59+
60+
val initContainer = new ContainerBuilder()
61+
.withName(s"spark-init")
62+
.withImage(initContainerImage)
63+
.withImagePullPolicy("IfNotPresent")
64+
.addNewVolumeMount()
65+
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
66+
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
67+
.endVolumeMount()
68+
.addToVolumeMounts(sharedVolumeMounts: _*)
69+
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
70+
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
71+
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
72+
}.getOrElse(initContainer).build()
73+
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
74+
originalPodSpec, resolvedInitContainer)
75+
.editSpec()
76+
.addNewVolume()
77+
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
78+
.withNewConfigMap()
79+
.withName(initContainerConfigMapName)
80+
.addNewItem()
81+
.withKey(initContainerConfigMapKey)
82+
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
83+
.endItem()
84+
.endConfigMap()
85+
.endVolume()
86+
.addNewVolume()
87+
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
88+
.withEmptyDir(new EmptyDirVolumeSource())
89+
.endVolume()
90+
.addNewVolume()
91+
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
92+
.withEmptyDir(new EmptyDirVolumeSource())
93+
.endVolume()
94+
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
95+
.addToVolumeMounts(sharedVolumeMounts: _*)
96+
.endContainer()
97+
.endSpec()
98+
resourceStagingServerSecretPlugin.map { plugin =>
99+
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
100+
}.getOrElse(podWithBasicVolumes)
101+
}
102+
103+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -349,42 +349,43 @@ package object config extends Logging {
349349
.stringConf
350350
.createOptional
351351

352+
private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
352353
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
353-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.serverCertPem")
354+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
354355
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
355356
" listen on TLS.")
356357
.stringConf
357358
.createOptional
358359

359360
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
360-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile")
361+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
361362
.doc("File containing the keystore password for the Kubernetes dependency server.")
362363
.stringConf
363364
.createOptional
364365

365366
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
366-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile")
367+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
367368
.doc("File containing the key password for the Kubernetes dependency server.")
368369
.stringConf
369370
.createOptional
370371

371372
private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
372-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.enabled")
373+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
373374
.doc("Whether or not to use SSL when communicating with the dependency server.")
374375
.booleanConf
375376
.createOptional
376377
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
377-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStore")
378+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
378379
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
379380
.stringConf
380381
.createOptional
381382
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
382-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword")
383+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
383384
.doc("Password for the trustStore for talking to the dependency server.")
384385
.stringConf
385386
.createOptional
386387
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
387-
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
388+
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
388389
.doc("Type of trustStore for communicating with the dependency server.")
389390
.stringConf
390391
.createOptional
@@ -397,64 +398,120 @@ package object config extends Logging {
397398
.createOptional
398399

399400
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
400-
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
401+
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsResourceIdentifier")
401402
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
402403
.internal()
403404
.stringConf
404405
.createOptional
405406

406407
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
407-
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
408+
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsSecretLocation")
408409
.doc("Location of the application secret to use when the init-container contacts the" +
409410
" resource staging server to download jars.")
410411
.internal()
411412
.stringConf
412-
.createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH)
413+
.createWithDefault(s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/" +
414+
s"$INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY")
413415

414416
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
415-
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
417+
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesResourceIdentifier")
416418
.doc("Identifier for the files tarball that was uploaded to the staging service.")
417419
.internal()
418420
.stringConf
419421
.createOptional
420422

421423
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
422-
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
424+
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesSecretLocation")
423425
.doc("Location of the application secret to use when the init-container contacts the" +
424426
" resource staging server to download files.")
425427
.internal()
426428
.stringConf
427-
.createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH)
429+
.createWithDefault(
430+
s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/$INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY")
431+
432+
private[spark] val INIT_CONTAINER_REMOTE_JARS =
433+
ConfigBuilder("spark.kubernetes.initcontainer.remoteJars")
434+
.doc("Comma-separated list of jar URIs to download in the init-container. This is" +
435+
" calculated from spark.jars.")
436+
.internal()
437+
.stringConf
438+
.createOptional
439+
440+
private[spark] val INIT_CONTAINER_REMOTE_FILES =
441+
ConfigBuilder("spark.kubernetes.initcontainer.remoteFiles")
442+
.doc("Comma-separated list of file URIs to download in the init-container. This is" +
443+
" calculated from spark.files.")
444+
.internal()
445+
.stringConf
446+
.createOptional
428447

429448
private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
430-
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
431-
.doc("Image for the driver's init-container that downloads mounted dependencies.")
449+
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
450+
.doc("Image for the driver and executor's init-container that downloads dependencies.")
432451
.stringConf
433-
.createWithDefault(s"spark-driver-init:$sparkVersion")
452+
.createWithDefault(s"spark-init:$sparkVersion")
434453

435-
private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION =
436-
ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir")
437-
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
438-
" directory must be empty and will be mounted as an empty directory volume on the" +
439-
" driver pod.")
454+
private[spark] val INIT_CONTAINER_JARS_DOWNLOAD_LOCATION =
455+
ConfigBuilder("spark.kubernetes.mountdependencies.jarsDownloadDir")
456+
.doc("Location to download jars to in the driver and executors. When using" +
457+
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
458+
" volume on the driver and executor pod.")
440459
.stringConf
441-
.createWithDefault("/var/spark-data/spark-local-jars")
460+
.createWithDefault("/var/spark-data/spark-submitted-jars")
442461

443-
private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION =
444-
ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir")
445-
.doc("Location to download local files to in the driver. When using spark-submit, this" +
446-
" directory must be empty and will be mounted as an empty directory volume on the" +
447-
" driver pod.")
462+
private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
463+
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
464+
.doc("Location to download files to in the driver and executors. When using" +
465+
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
466+
" volume on the driver and executor pods.")
448467
.stringConf
449-
.createWithDefault("/var/spark-data/spark-local-files")
468+
.createWithDefault("/var/spark-data/spark-submitted-files")
450469

451-
private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
470+
private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
452471
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
453472
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
454-
" the dependency staging server when initializing the driver pod.")
473+
" remote locations and the resource staging server when initializing the driver and" +
474+
" executor pods.")
455475
.timeConf(TimeUnit.MINUTES)
456476
.createWithDefault(5)
457477

478+
private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP =
479+
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname")
480+
.doc("Name of the config map to use in the init-container that retrieves submitted files" +
481+
" for the executor.")
482+
.internal()
483+
.stringConf
484+
.createOptional
485+
486+
private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY =
487+
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapkey")
488+
.doc("Key for the entry in the init container config map for submitted files that" +
489+
" corresponds to the properties for this init-container.")
490+
.internal()
491+
.stringConf
492+
.createOptional
493+
494+
private[spark] val EXECUTOR_INIT_CONTAINER_SECRET =
495+
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.name")
496+
.doc("Name of the secret to mount into the init-container that retrieves submitted files.")
497+
.internal()
498+
.stringConf
499+
.createOptional
500+
501+
private[spark] val EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR =
502+
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir")
503+
.doc("Directory to mount the resource staging server secrets into for the executor" +
504+
" init-containers. This must be exactly the same as the directory that the submission" +
505+
" client mounted the secret into because the config map's properties specify the" +
506+
" secret location as to be the same between the driver init-container and the executor" +
507+
" init-container. Thus the submission client will always set this and the driver will" +
508+
" never rely on a constant or convention, in order to protect against cases where the" +
509+
" submission client has a different version from the driver itself, and hence might" +
510+
" have different constants loaded in constants.scala.")
511+
.internal()
512+
.stringConf
513+
.createOptional
514+
458515
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
459516
if (!rawMasterString.startsWith("k8s://")) {
460517
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

0 commit comments

Comments
 (0)