Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit e37b0cf

Browse files
mccheahfoxish
authored andcommitted
Clean up resources that are not used by pods. (#305)
* Clean up resources that are not used by pods. * Make client side send correct credentials. * Simplify cleanup logic. Cancellation is no longer instantaneous and we might clean up a little later than the given TTL. However, the tradeoff is a simpler implementation with clearer contracts about when things will and will not be cleaned up. * Remove class * Fix imports and line length. * Remove import. * Add a unit test for StagingResourcesStore. * Revamp cleanup process. - Delete resources immediately when owners do not exist - Delete resources if after they are first uploaded, they are not accessed for a certain period of time. - Resource owners are more specifically defined and can have a type (currently only uses pods) * Clarify log messages * Use a single set of credentials in resource staging server. Also refactors construction of Kubernetes Clients to unify the code paths. * Fix unit test. * Safe close if creating shuffle block handler fails * Use implicit class. * Address comments. * Fix broken test.
1 parent bb1b234 commit e37b0cf

32 files changed

+1242
-761
lines changed

docs/running-on-kubernetes.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,69 @@ from the other deployment modes. See the [configuration page](configuration.html
450450
client cert file, and/or OAuth token.
451451
</td>
452452
</tr>
453+
<tr>
454+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.caCertFile</code></td>
455+
<td>(none)</td>
456+
<td>
457+
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the resource staging server when
458+
it monitors objects in determining when to clean up resource bundles.
459+
</td>
460+
</tr>
461+
<tr>
462+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.clientKeyFile</code></td>
463+
<td>(none)</td>
464+
<td>
465+
Path to the client key file for authenticating against the Kubernetes API server from the resource staging server
466+
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
467+
credentials that allow it to view API objects in any namespace.
468+
</td>
469+
</tr>
470+
<tr>
471+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.clientCertFile</code></td>
472+
<td>(none)</td>
473+
<td>
474+
Path to the client cert file for authenticating against the Kubernetes API server from the resource staging server
475+
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
476+
credentials that allow it to view API objects in any namespace.
477+
</td>
478+
</tr>
479+
<tr>
480+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.oauthToken</code></td>
481+
<td>(none)</td>
482+
<td>
483+
OAuth token value for authenticating against the Kubernetes API server from the resource staging server
484+
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
485+
credentials that allow it to view API objects in any namespace. Note that this cannot be set at the same time as
486+
<code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code>.
487+
</td>
488+
</tr>
489+
<tr>
490+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code></td>
491+
<td>(none)</td>
492+
<td>
493+
File containing the OAuth token to use when authenticating against the against the Kubernetes API server from the
494+
resource staging server, when it monitors objects in determining when to clean up resource bundles. The resource
495+
staging server must have credentials that allow it to view API objects in any namespace. Note that this cannot be
496+
set at the same time as <code>spark.kubernetes.authenticate.resourceStagingServer.oauthToken</code>.
497+
</td>
498+
</tr>
499+
<tr>
500+
<td><code>spark.kubernetes.authenticate.resourceStagingServer.useServiceAccountCredentials</code></td>
501+
<td>true</td>
502+
<td>
503+
Whether or not to use a service account token and a service account CA certificate when the resource staging server
504+
authenticates to Kubernetes. If this is set, interactions with Kubernetes will authenticate using a token located at
505+
<code>/var/run/secrets/kubernetes.io/serviceaccount/token</code> and the CA certificate located at
506+
<code>/var/run/secrets/kubernetes.io/serviceaccount/ca.crt</code>. Note that if
507+
<code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code> is set, it takes precedence
508+
over the usage of the service account token file. Also, if
509+
<code>spark.kubernetes.authenticate.resourceStagingServer.caCertFile</code> is set, it takes precedence over using
510+
the service account's CA certificate file. This generally should be set to true (the default value) when the
511+
resource staging server is deployed as a Kubernetes pod, but should be set to false if the resource staging server
512+
is deployed by other means (i.e. when running the staging server process outside of Kubernetes). The resource
513+
staging server must have credentials that allow it to view API objects in any namespace.
514+
</td>
515+
</tr>
453516
<tr>
454517
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
455518
<td>executorMemory * 0.10, with minimum of 384</td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,42 @@
1717

1818
package org.apache.spark.deploy.kubernetes
1919

20+
import java.io.File
2021
import java.nio.ByteBuffer
2122

2223
import io.fabric8.kubernetes.api.model.Pod
23-
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher}
24+
import io.fabric8.kubernetes.client.{Config, KubernetesClient, KubernetesClientException, Watch, Watcher}
2425
import io.fabric8.kubernetes.client.Watcher.Action
2526
import org.apache.commons.io.IOUtils
2627
import scala.collection.JavaConverters._
2728
import scala.collection.mutable
2829

2930
import org.apache.spark.{SecurityManager, SparkConf}
3031
import org.apache.spark.deploy.ExternalShuffleService
32+
import org.apache.spark.deploy.kubernetes.config._
3133
import org.apache.spark.deploy.kubernetes.constants._
3234
import org.apache.spark.internal.Logging
3335
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
3436
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
3537
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver}
3638
import org.apache.spark.network.util.TransportConf
37-
import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider
3839

3940
/**
4041
* An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes.
4142
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
4243
*/
4344
private[spark] class KubernetesShuffleBlockHandler (
4445
transportConf: TransportConf,
45-
kubernetesClientProvider: DriverPodKubernetesClientProvider)
46+
kubernetesClient: KubernetesClient)
4647
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
4748

4849
private val INIT_AND_STOP_LOCK = new Object
4950
private val CONNECTED_APPS_LOCK = new Object
5051
private val connectedApps = mutable.Set.empty[String]
5152
private var shuffleWatch: Option[Watch] = None
52-
private var kubernetesClient: Option[KubernetesClient] = None
5353

5454
def start(): Unit = INIT_AND_STOP_LOCK.synchronized {
55-
val client = kubernetesClientProvider.get
56-
shuffleWatch = startShuffleWatcher(client)
57-
kubernetesClient = Some(client)
55+
shuffleWatch = startShuffleWatcher()
5856
}
5957

6058
override def close(): Unit = {
@@ -64,8 +62,7 @@ private[spark] class KubernetesShuffleBlockHandler (
6462
INIT_AND_STOP_LOCK.synchronized {
6563
shuffleWatch.foreach(IOUtils.closeQuietly)
6664
shuffleWatch = None
67-
kubernetesClient.foreach(IOUtils.closeQuietly)
68-
kubernetesClient = None
65+
IOUtils.closeQuietly(kubernetesClient)
6966
}
7067
}
7168
}
@@ -90,9 +87,9 @@ private[spark] class KubernetesShuffleBlockHandler (
9087
}
9188
}
9289

93-
private def startShuffleWatcher(client: KubernetesClient): Option[Watch] = {
90+
private def startShuffleWatcher(): Option[Watch] = {
9491
try {
95-
Some(client
92+
Some(kubernetesClient
9693
.pods()
9794
.withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava)
9895
.watch(new Watcher[Pod] {
@@ -137,42 +134,55 @@ private[spark] class KubernetesShuffleBlockHandler (
137134
*/
138135
private[spark] class KubernetesExternalShuffleService(
139136
conf: SparkConf,
140-
securityManager: SecurityManager,
141-
kubernetesClientProvider: DriverPodKubernetesClientProvider)
137+
securityManager: SecurityManager)
142138
extends ExternalShuffleService(conf, securityManager) {
143139

144140
private var shuffleBlockHandlers: mutable.Buffer[KubernetesShuffleBlockHandler] = _
145141
protected override def newShuffleBlockHandler(
146142
tConf: TransportConf): ExternalShuffleBlockHandler = {
147-
val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClientProvider)
148-
newBlockHandler.start()
149-
150-
// TODO: figure out a better way of doing this.
151-
// This is necessary because the constructor is not called
152-
// when this class is initialized through ExternalShuffleService.
153-
if (shuffleBlockHandlers == null) {
143+
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
144+
conf.get(KUBERNETES_SHUFFLE_APISERVER_URI),
145+
None,
146+
APISERVER_AUTH_SHUFFLE_SERVICE_CONF_PREFIX,
147+
conf,
148+
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH))
149+
.filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS)),
150+
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))
151+
.filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS)))
152+
val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClient)
153+
try {
154+
newBlockHandler.start()
155+
// TODO: figure out a better way of doing this.
156+
// This is necessary because the constructor is not called
157+
// when this class is initialized through ExternalShuffleService.
158+
if (shuffleBlockHandlers == null) {
154159
shuffleBlockHandlers = mutable.Buffer.empty[KubernetesShuffleBlockHandler]
160+
}
161+
shuffleBlockHandlers += newBlockHandler
162+
newBlockHandler
163+
} catch {
164+
case e: Throwable =>
165+
logError("Failed to create Kubernetes shuffle block handler.", e)
166+
newBlockHandler.close()
167+
throw e
155168
}
156-
shuffleBlockHandlers += newBlockHandler
157-
newBlockHandler
158169
}
159170

160171
override def stop(): Unit = {
161172
try {
162173
super.stop()
163174
} finally {
164-
shuffleBlockHandlers.foreach(_.close())
175+
if (shuffleBlockHandlers != null) {
176+
shuffleBlockHandlers.foreach(_.close())
177+
}
165178
}
166179
}
167180
}
168181

169182
private[spark] object KubernetesExternalShuffleService extends Logging {
170183
def main(args: Array[String]): Unit = {
171184
ExternalShuffleService.main(args,
172-
(conf: SparkConf, sm: SecurityManager) => {
173-
val kubernetesClientProvider = new DriverPodKubernetesClientProvider(conf)
174-
new KubernetesExternalShuffleService(conf, sm, kubernetesClientProvider)
175-
})
185+
(conf: SparkConf, sm: SecurityManager) => new KubernetesExternalShuffleService(conf, sm))
176186
}
177187
}
178188

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.io.File
20+
21+
import com.google.common.base.Charsets
22+
import com.google.common.io.Files
23+
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
24+
import io.fabric8.kubernetes.client.utils.HttpClientUtils
25+
import okhttp3.Dispatcher
26+
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.deploy.kubernetes.config._
29+
import org.apache.spark.util.ThreadUtils
30+
31+
/**
32+
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
33+
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
34+
* options for different components.
35+
*/
36+
private[spark] object SparkKubernetesClientFactory {
37+
38+
def createKubernetesClient(
39+
master: String,
40+
namespace: Option[String],
41+
kubernetesAuthConfPrefix: String,
42+
sparkConf: SparkConf,
43+
maybeServiceAccountToken: Option[File],
44+
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
45+
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
46+
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
47+
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
48+
.map(new File(_))
49+
.orElse(maybeServiceAccountToken)
50+
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
51+
OptionRequirements.requireNandDefined(
52+
oauthTokenFile,
53+
oauthTokenValue,
54+
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
55+
s" value $oauthTokenConf.")
56+
57+
val caCertFile = sparkConf
58+
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
59+
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
60+
val clientKeyFile = sparkConf
61+
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
62+
val clientCertFile = sparkConf
63+
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
64+
val dispatcher = new Dispatcher(
65+
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
66+
val config = new ConfigBuilder()
67+
.withApiVersion("v1")
68+
.withMasterUrl(master)
69+
.withWebsocketPingInterval(0)
70+
.withOption(oauthTokenValue) {
71+
(token, configBuilder) => configBuilder.withOauthToken(token)
72+
}.withOption(oauthTokenFile) {
73+
(file, configBuilder) =>
74+
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
75+
}.withOption(caCertFile) {
76+
(file, configBuilder) => configBuilder.withCaCertFile(file)
77+
}.withOption(clientKeyFile) {
78+
(file, configBuilder) => configBuilder.withClientKeyFile(file)
79+
}.withOption(clientCertFile) {
80+
(file, configBuilder) => configBuilder.withClientCertFile(file)
81+
}.withOption(namespace) {
82+
(ns, configBuilder) => configBuilder.withNamespace(ns)
83+
}.build()
84+
val baseHttpClient = HttpClientUtils.createHttpClient(config)
85+
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
86+
.dispatcher(dispatcher)
87+
.build()
88+
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
89+
}
90+
91+
private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {
92+
93+
def withOption[T]
94+
(option: Option[T])
95+
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = {
96+
new OptionConfigurableConfigBuilder(option.map { opt =>
97+
configurator(opt, configBuilder)
98+
}.getOrElse(configBuilder))
99+
}
100+
101+
def build(): Config = configBuilder.build()
102+
}
103+
}

0 commit comments

Comments
 (0)