This repository was archived by the owner on Jan 9, 2020. It is now read-only.
File tree Expand file tree Collapse file tree 8 files changed +26
-5
lines changed
resource-managers/kubernetes/core/src
main/scala/org/apache/spark
scheduler/cluster/kubernetes
test/scala/org/apache/spark/deploy/kubernetes Expand file tree Collapse file tree 8 files changed +26
-5
lines changed Original file line number Diff line number Diff line change @@ -661,6 +661,13 @@ from the other deployment modes. See the [configuration page](configuration.html
661
661
Interval between reports of the current Spark job status in cluster mode.
662
662
</td >
663
663
</tr >
664
+ <tr >
665
+ <td ><code >spark.kubernetes.docker.image.pullPolicy</code ></td >
666
+ <td ><code >IfNotPresent</code ></td >
667
+ <td >
668
+ Docker image pull policy used when pulling Docker images with Kubernetes.
669
+ </td >
670
+ </tr >
664
671
</table >
665
672
666
673
Original file line number Diff line number Diff line change @@ -36,6 +36,7 @@ private[spark] trait SparkPodInitContainerBootstrap {
36
36
37
37
private [spark] class SparkPodInitContainerBootstrapImpl (
38
38
initContainerImage : String ,
39
+ dockerImagePullPolicy : String ,
39
40
jarsDownloadPath : String ,
40
41
filesDownloadPath : String ,
41
42
downloadTimeoutMinutes : Long ,
@@ -60,7 +61,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
60
61
val initContainer = new ContainerBuilder ()
61
62
.withName(s " spark-init " )
62
63
.withImage(initContainerImage)
63
- .withImagePullPolicy(" IfNotPresent " )
64
+ .withImagePullPolicy(dockerImagePullPolicy )
64
65
.addNewVolumeMount()
65
66
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME )
66
67
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR )
Original file line number Diff line number Diff line change @@ -47,6 +47,12 @@ package object config extends Logging {
47
47
.stringConf
48
48
.createWithDefault(s " spark-executor: $sparkVersion" )
49
49
50
+ private [spark] val DOCKER_IMAGE_PULL_POLICY =
51
+ ConfigBuilder (" spark.kubernetes.docker.image.pullPolicy" )
52
+ .doc(" Docker image pull policy when pulling any docker image in Kubernetes integration" )
53
+ .stringConf
54
+ .createWithDefault(" IfNotPresent" )
55
+
50
56
private [spark] val APISERVER_AUTH_SUBMISSION_CONF_PREFIX =
51
57
" spark.kubernetes.authenticate.submission"
52
58
private [spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
Original file line number Diff line number Diff line change @@ -60,6 +60,7 @@ private[spark] class Client(
60
60
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME )
61
61
.getOrElse(kubernetesAppId)
62
62
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE )
63
+ private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY )
63
64
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY )
64
65
private val memoryOverheadMb = sparkConf
65
66
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD )
@@ -99,7 +100,7 @@ private[spark] class Client(
99
100
val driverContainer = new ContainerBuilder ()
100
101
.withName(DRIVER_CONTAINER_NAME )
101
102
.withImage(driverDockerImage)
102
- .withImagePullPolicy(" IfNotPresent " )
103
+ .withImagePullPolicy(dockerImagePullPolicy )
103
104
.addToEnv(driverExtraClasspathEnv.toSeq: _* )
104
105
.addNewEnv()
105
106
.withName(ENV_DRIVER_MEMORY )
Original file line number Diff line number Diff line change @@ -104,6 +104,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
104
104
private val configMapName = s " $kubernetesAppId-init-config "
105
105
private val configMapKey = s " $kubernetesAppId-init-config-key "
106
106
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE )
107
+ private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY )
107
108
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT )
108
109
109
110
override def provideInitContainerConfigMapBuilder (
@@ -196,6 +197,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
196
197
}
197
198
new SparkPodInitContainerBootstrapImpl (
198
199
initContainerImage,
200
+ dockerImagePullPolicy,
199
201
jarsDownloadPath,
200
202
filesDownloadPath,
201
203
downloadTimeoutMinutes,
Original file line number Diff line number Diff line change @@ -46,7 +46,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
46
46
val maybeExecutorInitContainerSecretName =
47
47
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET )
48
48
val maybeExecutorInitContainerSecretMount =
49
- sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR )
49
+ sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR )
50
50
val executorInitContainerSecretVolumePlugin = for {
51
51
initContainerSecretName <- maybeExecutorInitContainerSecretName
52
52
initContainerSecretMountPath <- maybeExecutorInitContainerSecretMount
@@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
65
65
} yield {
66
66
new SparkPodInitContainerBootstrapImpl (
67
67
sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE ),
68
+ sparkConf.get(DOCKER_IMAGE_PULL_POLICY ),
68
69
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION ),
69
70
sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION ),
70
71
sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT ),
@@ -95,4 +96,3 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
95
96
scheduler.asInstanceOf [TaskSchedulerImpl ].initialize(backend)
96
97
}
97
98
}
98
-
Original file line number Diff line number Diff line change @@ -77,6 +77,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
77
77
78
78
private var shufflePodCache : Option [ShufflePodCache ] = None
79
79
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE )
80
+ private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY )
80
81
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE )
81
82
private val executorPort = conf.getInt(" spark.executor.port" , DEFAULT_STATIC_PORT )
82
83
private val blockmanagerPort = conf
@@ -354,7 +355,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
354
355
.addNewContainer()
355
356
.withName(s " executor " )
356
357
.withImage(executorDockerImage)
357
- .withImagePullPolicy(" IfNotPresent " )
358
+ .withImagePullPolicy(dockerImagePullPolicy )
358
359
.withNewResources()
359
360
.addToRequests(" memory" , executorMemoryQuantity)
360
361
.addToLimits(" memory" , executorMemoryLimitQuantity)
Original file line number Diff line number Diff line change @@ -27,6 +27,7 @@ import org.apache.spark.deploy.kubernetes.constants._
27
27
class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAfter {
28
28
private val OBJECT_MAPPER = new ObjectMapper ()
29
29
private val INIT_CONTAINER_IMAGE = " spark-init:latest"
30
+ private val DOCKER_IMAGE_PULL_POLICY = " IfNotPresent"
30
31
private val JARS_DOWNLOAD_PATH = " /var/data/spark-jars"
31
32
private val FILES_DOWNLOAD_PATH = " /var/data/spark-files"
32
33
private val DOWNLOAD_TIMEOUT_MINUTES = 5
@@ -137,6 +138,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
137
138
private def bootstrapPodWithoutSubmittedDependencies (): Pod = {
138
139
val bootstrapUnderTest = new SparkPodInitContainerBootstrapImpl (
139
140
INIT_CONTAINER_IMAGE ,
141
+ DOCKER_IMAGE_PULL_POLICY ,
140
142
JARS_DOWNLOAD_PATH ,
141
143
FILES_DOWNLOAD_PATH ,
142
144
DOWNLOAD_TIMEOUT_MINUTES ,
@@ -150,6 +152,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
150
152
private def bootstrapPodWithSubmittedDependencies (): Pod = {
151
153
val bootstrapUnderTest = new SparkPodInitContainerBootstrapImpl (
152
154
INIT_CONTAINER_IMAGE ,
155
+ DOCKER_IMAGE_PULL_POLICY ,
153
156
JARS_DOWNLOAD_PATH ,
154
157
FILES_DOWNLOAD_PATH ,
155
158
DOWNLOAD_TIMEOUT_MINUTES ,
You can’t perform that action at this time.
0 commit comments