Skip to content

Commit 9783aa7

Browse files
committed
Move local file mounting to new architecture.
1 parent 486669a commit 9783aa7

18 files changed

+366
-284
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ private[spark] object Constants {
7070

7171
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
7272
// Spark app configs for containers
73+
val MOUNTED_FILES_SECRET_DIR = "/var/data/spark-submitted-files"
7374
val SPARK_CONF_VOLUME = "spark-conf-volume"
7475
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
7576
val SPARK_CONF_FILE_NAME = "spark.properties"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
5151
roleSpecificConf: T,
5252
appResourceNamePrefix: String,
5353
appId: String,
54+
mountLocalFilesSecretName: Option[String],
5455
roleLabels: Map[String, String],
5556
roleAnnotations: Map[String, String],
5657
roleSecretNamesToMountPaths: Map[String, String],
@@ -137,6 +138,7 @@ private[spark] object KubernetesConf {
137138
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
138139
appResourceNamePrefix,
139140
appId,
141+
Some(s"$appResourceNamePrefix-submitted-local-files"),
140142
driverLabels,
141143
driverAnnotations,
142144
driverSecretNamesToMountPaths,
@@ -176,6 +178,7 @@ private[spark] object KubernetesConf {
176178
KubernetesExecutorSpecificConf(executorId, driverPod),
177179
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
178180
appId,
181+
sparkConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET),
179182
executorLabels,
180183
executorAnnotations,
181184
executorSecrets,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19+
import java.io.File
20+
1921
import io.fabric8.kubernetes.api.model.LocalObjectReference
2022

2123
import org.apache.spark.SparkConf
@@ -52,6 +54,17 @@ private[spark] object KubernetesUtils {
5254
}
5355
}
5456

57+
def submitterLocalFiles(fileUris: Iterable[String]): Iterable[String] = {
58+
fileUris
59+
.map(Utils.resolveURI)
60+
.filter { file =>
61+
Option(file.getScheme).getOrElse("file") == "file"
62+
}
63+
.map(_.getPath)
64+
.map(new File(_))
65+
.map(_.getAbsolutePath)
66+
}
67+
5568
private def resolveFileUri(uri: String): String = {
5669
val fileUri = Utils.resolveURI(uri)
5770
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.io.File
20+
import java.nio.file.Paths
21+
22+
import com.google.common.io.{BaseEncoding, Files}
23+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}
24+
import scala.collection.JavaConverters._
25+
26+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
27+
import org.apache.spark.deploy.k8s.Config._
28+
import org.apache.spark.deploy.k8s.Constants._
29+
import org.apache.spark.util.Utils
30+
31+
private[spark] class MountLocalFilesFeatureStep(
32+
kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
33+
extends KubernetesFeatureConfigStep {
34+
require(kubernetesConf.mountLocalFilesSecretName.isDefined,
35+
"Shouldn't be using this feature without a secret name.")
36+
private val secretName = kubernetesConf.mountLocalFilesSecretName.get
37+
38+
override def configurePod(pod: SparkPod): SparkPod = {
39+
val resolvedPod = new PodBuilder(pod.pod)
40+
.editOrNewSpec()
41+
.addNewVolume()
42+
.withName("submitted-files")
43+
.withNewSecret()
44+
.withSecretName(secretName)
45+
.endSecret()
46+
.endVolume()
47+
.endSpec()
48+
.build()
49+
val resolvedContainer = new ContainerBuilder(pod.container)
50+
.addNewEnv()
51+
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
52+
.withValue(MOUNTED_FILES_SECRET_DIR)
53+
.endEnv()
54+
.addNewVolumeMount()
55+
.withName("submitted-files")
56+
.withMountPath(MOUNTED_FILES_SECRET_DIR)
57+
.endVolumeMount()
58+
.build()
59+
SparkPod(resolvedPod, resolvedContainer)
60+
}
61+
62+
override def getAdditionalPodSystemProperties(): Map[String, String] = {
63+
val allFiles = kubernetesConf.sparkFiles()
64+
val resolvedFiles = allFiles
65+
.map(file => {
66+
val uri = Utils.resolveURI(file)
67+
val scheme = Option(uri.getScheme).getOrElse("file")
68+
if (scheme != "file") {
69+
file
70+
} else {
71+
val fileName = Paths.get(uri.getPath).getFileName.toString
72+
s"$MOUNTED_FILES_SECRET_DIR/$fileName"
73+
}
74+
})
75+
Map(
76+
EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key -> secretName,
77+
"spark.files" -> resolvedFiles.mkString(","))
78+
}
79+
80+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
81+
val localFiles = kubernetesConf.sparkFiles()
82+
.map(Utils.resolveURI)
83+
.filter { file =>
84+
Option(file.getScheme).getOrElse("file") == "file"
85+
}
86+
.map(_.getPath)
87+
.map(new File(_))
88+
val localFileBase64Contents = localFiles.map { file =>
89+
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
90+
(file.getName, fileBase64)
91+
}.toMap
92+
val localFilesSecret = new SecretBuilder()
93+
.withNewMetadata()
94+
.withName(secretName)
95+
.endMetadata()
96+
.withData(localFileBase64Contents.asJava)
97+
.build()
98+
Seq(localFilesSecret)
99+
}
100+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit
1818

19-
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
20-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep}
19+
import java.io.File
20+
21+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils}
22+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
23+
import org.apache.spark.util.Utils
2124

2225
private[spark] class KubernetesDriverBuilder(
2326
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
@@ -29,18 +32,36 @@ private[spark] class KubernetesDriverBuilder(
2932
new DriverServiceFeatureStep(_),
3033
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3134
=> MountSecretsFeatureStep) =
32-
new MountSecretsFeatureStep(_)) {
35+
new MountSecretsFeatureStep(_),
36+
provideMountLocalFilesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
37+
=> MountLocalFilesFeatureStep) =
38+
new MountLocalFilesFeatureStep(_)) {
39+
import KubernetesDriverBuilder._
3340

3441
def buildFromFeatures(
3542
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
3643
val baseFeatures = Seq(
3744
provideBasicStep(kubernetesConf),
3845
provideCredentialsStep(kubernetesConf),
3946
provideServiceStep(kubernetesConf))
40-
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
47+
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
4148
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
4249
} else baseFeatures
4350

51+
val sparkFiles = kubernetesConf.sparkFiles()
52+
val localFiles = KubernetesUtils.submitterLocalFiles(sparkFiles).map(new File(_))
53+
require(localFiles.forall(_.isFile), s"All submitted local files must be present and not" +
54+
s" directories, Got got: ${localFiles.map(_.getAbsolutePath).mkString(",")}")
55+
56+
val totalFileSize = localFiles.map(_.length()).sum
57+
val totalSizeBytesString = Utils.bytesToString(totalFileSize)
58+
require(totalFileSize < MAX_SECRET_BUNDLE_SIZE_BYTES,
59+
s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING." +
60+
s" Total size for files ended up being $totalSizeBytesString")
61+
val allFeatures = if (localFiles.nonEmpty) {
62+
withProvideSecretsStep ++ Seq(provideMountLocalFilesStep(kubernetesConf))
63+
} else withProvideSecretsStep
64+
4465
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
4566
for (feature <- allFeatures) {
4667
val configuredPod = feature.configurePod(spec.pod)
@@ -54,3 +75,9 @@ private[spark] class KubernetesDriverBuilder(
5475
spec
5576
}
5677
}
78+
79+
private object KubernetesDriverBuilder {
80+
val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240
81+
val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING =
82+
Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES)
83+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountLocalFilesStep.scala

Lines changed: 0 additions & 76 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,27 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
20-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountSecretsFeatureStep}
20+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2121

2222
private[spark] class KubernetesExecutorBuilder(
2323
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
2424
new BasicExecutorFeatureStep(_),
2525
provideSecretsStep:
2626
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
27-
new MountSecretsFeatureStep(_)) {
27+
new MountSecretsFeatureStep(_),
28+
provideMountLocalFilesStep:
29+
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountLocalFilesFeatureStep =
30+
new MountLocalFilesFeatureStep(_)) {
2831

2932
def buildFromFeatures(
3033
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
3134
val baseFeatures = Seq(provideBasicStep(kubernetesConf))
32-
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
35+
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
3336
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
3437
} else baseFeatures
38+
val allFeatures = if (kubernetesConf.mountLocalFilesSecretName.isDefined) {
39+
withProvideSecretsStep ++ Seq(provideMountLocalFilesStep(kubernetesConf))
40+
} else withProvideSecretsStep
3541
var executorPod = SparkPod.initialPod()
3642
for (feature <- allFeatures) {
3743
executorPod = feature.configurePod(executorPod)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala

Lines changed: 0 additions & 36 deletions
This file was deleted.

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
6666
APP_ARGS),
6767
RESOURCE_NAME_PREFIX,
6868
APP_ID,
69+
None,
6970
DRIVER_LABELS,
7071
DRIVER_ANNOTATIONS,
7172
Map.empty,
@@ -135,6 +136,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
135136
APP_ARGS),
136137
RESOURCE_NAME_PREFIX,
137138
APP_ID,
139+
None,
138140
DRIVER_LABELS,
139141
DRIVER_ANNOTATIONS,
140142
Map.empty,

0 commit comments

Comments
 (0)