Skip to content

Commit 5178d0c

Browse files
Andrew KorzhuevRobert Kruszewski
authored andcommitted
[SPARK-23668][K8S] Add config option for passing through k8s Pod.spec.imagePullSecrets
## What changes were proposed in this pull request? Pass through the `imagePullSecrets` option to the k8s pod in order to allow user to access private image registries. See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ ## How was this patch tested? Unit tests + manual testing. Manual testing procedure: 1. Have private image registry. 2. Spark-submit application with no `spark.kubernetes.imagePullSecret` set. Do `kubectl describe pod ...`. See the error message: ``` Error syncing pod, skipping: failed to "StartContainer" for "spark-kubernetes-driver" with ErrImagePull: "rpc error: code = 2 desc = Error: Status 400 trying to pull repository ...: \"{\\n \\\"errors\\\" : [ {\\n \\\"status\\\" : 400,\\n \\\"message\\\" : \\\"Unsupported docker v1 repository request for '...'\\\"\\n } ]\\n}\"" ``` 3. Create secret `kubectl create secret docker-registry ...` 4. Spark-submit with `spark.kubernetes.imagePullSecret` set to the new secret. See that deployment was successful. Author: Andrew Korzhuev <[email protected]> Author: Andrew Korzhuev <[email protected]> Closes apache#20811 from andrusha/spark-23668-image-pull-secrets.
1 parent 80052c8 commit 5178d0c

File tree

7 files changed

+78
-2
lines changed

7 files changed

+78
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ private[spark] object Config extends Logging {
5454
.checkValues(Set("Always", "Never", "IfNotPresent"))
5555
.createWithDefault("IfNotPresent")
5656

57+
val IMAGE_PULL_SECRETS =
58+
ConfigBuilder("spark.kubernetes.container.image.pullSecrets")
59+
.doc("Comma separated list of the Kubernetes secrets used " +
60+
"to access private image registries.")
61+
.stringConf
62+
.createOptional
63+
5764
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
5865
"spark.kubernetes.authenticate.driver"
5966
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.deploy.k8s
1818

1919
import java.nio.file.Paths
2020

21+
import io.fabric8.kubernetes.api.model.LocalObjectReference
22+
2123
import org.apache.spark.SparkConf
2224
import org.apache.spark.util.Utils
2325

@@ -37,6 +39,17 @@ private[spark] object KubernetesUtils {
3739
sparkConf.getAllWithPrefix(prefix).toMap
3840
}
3941

42+
/**
43+
* Parses comma-separated list of imagePullSecrets into K8s-understandable format
44+
*/
45+
def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
46+
imagePullSecrets match {
47+
case Some(secretsCommaSeparated) =>
48+
secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
49+
case None => Nil
50+
}
51+
}
52+
4053
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
4154
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
4255
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.{SparkConf, SparkException}
2424
import org.apache.spark.deploy.k8s.Config._
@@ -51,6 +51,8 @@ private[spark] class BasicDriverConfigurationStep(
5151
.get(DRIVER_CONTAINER_IMAGE)
5252
.getOrElse(throw new SparkException("Must specify the driver container image"))
5353

54+
private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
55+
5456
// CPU settings
5557
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
5658
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
@@ -129,6 +131,8 @@ private[spark] class BasicDriverConfigurationStep(
129131
case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
130132
}
131133

134+
val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
135+
132136
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
133137
.editOrNewMetadata()
134138
.withName(driverPodName)
@@ -138,6 +142,7 @@ private[spark] class BasicDriverConfigurationStep(
138142
.withNewSpec()
139143
.withRestartPolicy("Never")
140144
.withNodeSelector(nodeSelector.asJava)
145+
.withImagePullSecrets(parsedImagePullSecrets.asJava)
141146
.endSpec()
142147
.build()
143148

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ private[spark] class ExecutorPodFactory(
7070
.get(EXECUTOR_CONTAINER_IMAGE)
7171
.getOrElse(throw new SparkException("Must specify the executor container image"))
7272
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
73+
private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
7374
private val blockManagerPort = sparkConf
7475
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
7576

@@ -105,6 +106,8 @@ private[spark] class ExecutorPodFactory(
105106
nodeToLocalTaskCount: Map[String, Int]): Pod = {
106107
val name = s"$executorPodNamePrefix-exec-$executorId"
107108

109+
val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
110+
108111
// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
109112
// name as the hostname. This preserves uniqueness since the end of name contains
110113
// executorId
@@ -196,6 +199,7 @@ private[spark] class ExecutorPodFactory(
196199
.withHostname(hostname)
197200
.withRestartPolicy("Never")
198201
.withNodeSelector(nodeSelector.asJava)
202+
.withImagePullSecrets(parsedImagePullSecrets.asJava)
199203
.endSpec()
200204
.build()
201205

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.LocalObjectReference
20+
21+
import org.apache.spark.SparkFunSuite
22+
23+
class KubernetesUtilsTest extends SparkFunSuite {
24+
25+
test("testParseImagePullSecrets") {
26+
val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
27+
assert(noSecrets === Nil)
28+
29+
val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
30+
assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
31+
32+
val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2 , s3,s4"))
33+
assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
34+
}
35+
36+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
5151
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
5252
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
5353
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
54+
.set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")
5455

5556
val submissionStep = new BasicDriverConfigurationStep(
5657
APP_ID,
@@ -103,7 +104,12 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
103104
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
104105
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
105106
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
106-
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
107+
108+
val driverPodSpec = preparedDriverSpec.driverPod.getSpec
109+
assert(driverPodSpec.getRestartPolicy === "Never")
110+
assert(driverPodSpec.getImagePullSecrets.size() === 2)
111+
assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
112+
assert(driverPodSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
107113

108114
val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
109115
val expectedSparkConf = Map(

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
3434
private val driverPodUid: String = "driver-uid"
3535
private val executorPrefix: String = "base"
3636
private val executorImage: String = "executor-image"
37+
private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2"
3738
private val driverPod = new PodBuilder()
3839
.withNewMetadata()
3940
.withName(driverPodName)
@@ -55,6 +56,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
5556
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
5657
.set(CONTAINER_IMAGE, executorImage)
5758
.set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
59+
.set(IMAGE_PULL_SECRETS, imagePullSecrets)
5860
}
5961

6062
test("basic executor pod has reasonable defaults") {
@@ -77,6 +79,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
7779
.getRequests.get("memory").getAmount === "1408Mi")
7880
assert(executor.getSpec.getContainers.get(0).getResources
7981
.getLimits.get("memory").getAmount === "1408Mi")
82+
assert(executor.getSpec.getImagePullSecrets.size() === 2)
83+
assert(executor.getSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
84+
assert(executor.getSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
8085

8186
// The pod has no node selector, volumes.
8287
assert(executor.getSpec.getNodeSelector.isEmpty)

0 commit comments

Comments
 (0)