diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index c6772c1cb5ae4..e746410772f62 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -69,6 +69,18 @@ package object config extends Logging { private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + // TODO: This option is intended to be used for internal prototype only until the submission + // client automatically creates the secret file. Remove this option afterward + // unless other use is found. + private[spark] val MOUNTED_HADOOP_SECRET_CONF = + ConfigBuilder("spark.kubernetes.mounted.hadoopSecret") + .doc("Use a Kubernetes secret containing Hadoop tokens such as an HDFS delegation token." + + " The secret should have an entry named 'hadoop-token-file' under the data section," + + " which contains binary dumps of Hadoop tokens.") + .internal() + .stringConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_USE_SERVICE_ACCOUNT_CREDENTIALS = ConfigBuilder( s"$APISERVER_AUTH_RESOURCE_STAGING_SERVER_CONF_PREFIX.useServiceAccountCredentials") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 92f051b2ac298..f39a3fc2f4bb5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -43,6 +43,13 @@ package object constants { s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME" private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials" + // Hadoop credentials secrets for the Spark app. + private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" + private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME = "hadoop-token-file" + private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_PATH = + s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME" + private[spark] val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + // Default and fixed ports private[spark] val SUBMISSION_SERVER_PORT = 7077 private[spark] val DEFAULT_DRIVER_PORT = 7078 @@ -69,6 +76,7 @@ package object constants { private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" + private[spark] val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index 82abe55ac6989..06f1547e6a5b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps._ import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -94,6 +94,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( submissionSparkConf) val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesResourceNamePrefix) + val hadoopCredentialsStep = new DriverHadoopCredentialsStep(submissionSparkConf) val pythonStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) @@ -131,6 +132,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( Seq( initialSubmissionStep, kubernetesCredentialsStep, + hadoopCredentialsStep, dependencyResolutionStep) ++ initContainerBootstrapStep.toSeq ++ pythonStep.toSeq diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/HadoopSecretUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/HadoopSecretUtil.scala new file mode 100644 index 0000000000000..b7a9e9e3fd6cd --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/HadoopSecretUtil.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + +object HadoopSecretUtil { + + def configurePod(secretNameOption: Option[String], pod: Pod) : Pod = { + secretNameOption.map { secret => + new PodBuilder(pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(secret) + .endSecret() + .endVolume() + .endSpec() + .build() + }.getOrElse(pod) + } + + def configureContainer(secretNameOption: Option[String], + containerSpec: Container) : Container = { + secretNameOption.map { secret => + new ContainerBuilder(containerSpec) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(SPARK_APP_HADOOP_TOKEN_FILE_PATH) + .endEnv() + .build() + }.getOrElse(containerSpec) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverHadoopCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverHadoopCredentialsStep.scala new file mode 100644 index 0000000000000..88f2e7b3836c8 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverHadoopCredentialsStep.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.HadoopSecretUtil + +private[spark] class DriverHadoopCredentialsStep(submissionSparkConf: SparkConf) + extends DriverConfigurationStep { + + private val maybeMountedHadoopSecret = submissionSparkConf.get(MOUNTED_HADOOP_SECRET_CONF) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val podWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret, + driverSpec.driverPod) + val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer( + maybeMountedHadoopSecret, + driverSpec.driverContainer) + driverSpec.copy( + driverPod = podWithMountedHadoopToken, + driverContainer = containerWithMountedHadoopToken) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a50a9c8bb9c3b..f6aecc402c0fe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -37,7 +37,7 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil +import org.apache.spark.deploy.kubernetes.submit.{HadoopSecretUtil, InitContainerUtil} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -130,6 +130,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + private val maybeMountedHadoopSecret = conf.get(MOUNTED_HADOOP_SECRET_CONF) + private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). withName(kubernetesDriverPodName).get() @@ -582,9 +584,14 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) - val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity) + val executorPodWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret, + executorPodWithNodeAffinity) + val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer( + maybeMountedHadoopSecret, initBootstrappedExecutorContainer) + + val resolvedExecutorPod = new PodBuilder(executorPodWithMountedHadoopToken) .editSpec() - .addToContainers(initBootstrappedExecutorContainer) + .addToContainers(containerWithMountedHadoopToken) .endSpec() .build() try {