Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 5e2b205

Browse files
authored
Dynamic allocation, cleanup in case of driver death (#319)
* Adding cleanup for shuffle service for driver death * Address comments + fix tests * Cleanly open and close resources. * Added unit test, reusing RegisterDriver * lint + fix mesos
1 parent e5623b7 commit 5e2b205

File tree

13 files changed

+343
-19
lines changed

13 files changed

+343
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.kubernetes;
19+
20+
import org.apache.spark.network.client.RpcResponseCallback;
21+
import org.apache.spark.network.client.TransportClient;
22+
import org.apache.spark.network.sasl.SecretKeyHolder;
23+
import org.apache.spark.network.shuffle.ExternalShuffleClient;
24+
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
25+
import org.apache.spark.network.util.TransportConf;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
32+
/**
33+
* A client for talking to the external shuffle service in Kubernetes cluster mode.
34+
*
35+
* This is used by the each Spark executor to register with a corresponding external
36+
* shuffle service on the cluster. The purpose is for cleaning up shuffle files
37+
* reliably if the application exits unexpectedly.
38+
*/
39+
public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
40+
private static final Logger logger = LoggerFactory
41+
.getLogger(KubernetesExternalShuffleClient.class);
42+
43+
/**
44+
* Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
45+
* Please refer to docs on {@link ExternalShuffleClient} for more information.
46+
*/
47+
public KubernetesExternalShuffleClient(
48+
TransportConf conf,
49+
SecretKeyHolder secretKeyHolder,
50+
boolean saslEnabled,
51+
boolean saslEncryptionEnabled) {
52+
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
53+
}
54+
55+
public void registerDriverWithShuffleService(String host, int port) throws IOException {
56+
checkInit();
57+
ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer();
58+
TransportClient client = clientFactory.createClient(host, port);
59+
client.sendRpc(registerDriver, new RegisterDriverCallback());
60+
}
61+
62+
private class RegisterDriverCallback implements RpcResponseCallback {
63+
@Override
64+
public void onSuccess(ByteBuffer response) {
65+
logger.info("Successfully registered app " + appId + " with external shuffle service.");
66+
}
67+
68+
@Override
69+
public void onFailure(Throwable e) {
70+
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
71+
"Please manually remove shuffle data after driver exit. Error: " + e);
72+
}
73+
}
74+
75+
@Override
76+
public void close() {
77+
super.close();
78+
}
79+
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.spark.network.client.TransportClient;
3333
import org.apache.spark.network.sasl.SecretKeyHolder;
3434
import org.apache.spark.network.shuffle.ExternalShuffleClient;
35-
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
35+
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
3636
import org.apache.spark.network.util.TransportConf;
3737

3838
/**

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.buffer.Unpooled;
2424

2525
import org.apache.spark.network.protocol.Encodable;
26-
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
2726
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
2827

2928
/**
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.shuffle.protocol.mesos;
18+
package org.apache.spark.network.shuffle.protocol;
1919

2020
import com.google.common.base.Objects;
2121
import io.netty.buffer.ByteBuf;
2222

2323
import org.apache.spark.network.protocol.Encoders;
24-
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
2524

2625
// Needed by ScalaDoc. See SPARK-7726
2726
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
2827

2928
/**
30-
* A message sent from the driver to register with the MesosExternalShuffleService.
29+
* A message sent from the driver to register with an ExternalShuffleService.
3130
*/
3231
public class RegisterDriver extends BlockTransferMessage {
3332
private final String appId;

conf/kubernetes-shuffle-service.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ spec:
3838
# This is an official image that is built
3939
# from the dockerfiles/shuffle directory
4040
# in the spark distribution.
41-
image: kubespark/spark-shuffle:v2.1.0-kubernetes-0.1.0-alpha.3
41+
image: spark-shuffle:latest
42+
imagePullPolicy: IfNotPresent
4243
volumeMounts:
4344
- mountPath: '/tmp'
4445
name: temp-volume

mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import org.apache.spark.internal.Logging
2929
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
3030
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
3131
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
32-
import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
32+
import org.apache.spark.network.shuffle.protocol.RegisterDriver
33+
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat
3334
import org.apache.spark.network.util.TransportConf
3435
import org.apache.spark.util.ThreadUtils
3536

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.kubernetes
19+
20+
import java.nio.ByteBuffer
21+
22+
import io.fabric8.kubernetes.api.model.Pod
23+
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher}
24+
import io.fabric8.kubernetes.client.Watcher.Action
25+
import org.apache.commons.io.IOUtils
26+
import scala.collection.JavaConverters._
27+
import scala.collection.mutable
28+
29+
import org.apache.spark.{SecurityManager, SparkConf}
30+
import org.apache.spark.deploy.ExternalShuffleService
31+
import org.apache.spark.deploy.kubernetes.constants._
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
34+
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
35+
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver}
36+
import org.apache.spark.network.util.TransportConf
37+
import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider
38+
39+
/**
40+
* An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes.
41+
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
42+
*/
43+
private[spark] class KubernetesShuffleBlockHandler (
44+
transportConf: TransportConf,
45+
kubernetesClientProvider: DriverPodKubernetesClientProvider)
46+
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
47+
48+
private val INIT_AND_STOP_LOCK = new Object
49+
private val CONNECTED_APPS_LOCK = new Object
50+
private val connectedApps = mutable.Set.empty[String]
51+
private var shuffleWatch: Option[Watch] = None
52+
private var kubernetesClient: Option[KubernetesClient] = None
53+
54+
def start(): Unit = INIT_AND_STOP_LOCK.synchronized {
55+
val client = kubernetesClientProvider.get
56+
shuffleWatch = startShuffleWatcher(client)
57+
kubernetesClient = Some(client)
58+
}
59+
60+
override def close(): Unit = {
61+
try {
62+
super.close()
63+
} finally {
64+
INIT_AND_STOP_LOCK.synchronized {
65+
shuffleWatch.foreach(IOUtils.closeQuietly)
66+
shuffleWatch = None
67+
kubernetesClient.foreach(IOUtils.closeQuietly)
68+
kubernetesClient = None
69+
}
70+
}
71+
}
72+
73+
protected override def handleMessage(
74+
message: BlockTransferMessage,
75+
client: TransportClient,
76+
callback: RpcResponseCallback): Unit = {
77+
message match {
78+
case RegisterDriverParam(appId) =>
79+
val address = client.getSocketAddress
80+
logInfo(s"Received registration request from app $appId (remote address $address).")
81+
CONNECTED_APPS_LOCK.synchronized {
82+
if (connectedApps.contains(appId)) {
83+
logWarning(s"Received a registration request from app $appId, but it was already " +
84+
s"registered")
85+
}
86+
connectedApps += appId
87+
}
88+
callback.onSuccess(ByteBuffer.allocate(0))
89+
case _ => super.handleMessage(message, client, callback)
90+
}
91+
}
92+
93+
private def startShuffleWatcher(client: KubernetesClient): Option[Watch] = {
94+
try {
95+
Some(client
96+
.pods()
97+
.withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava)
98+
.watch(new Watcher[Pod] {
99+
override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
100+
action match {
101+
case Action.DELETED | Action.ERROR =>
102+
val labels = p.getMetadata.getLabels
103+
if (labels.containsKey(SPARK_APP_ID_LABEL)) {
104+
val appId = labels.get(SPARK_APP_ID_LABEL)
105+
CONNECTED_APPS_LOCK.synchronized {
106+
if (connectedApps.contains(appId)) {
107+
connectedApps -= appId
108+
applicationRemoved(appId, true)
109+
}
110+
}
111+
}
112+
case Action.ADDED | Action.MODIFIED =>
113+
}
114+
}
115+
116+
override def onClose(e: KubernetesClientException): Unit = {}
117+
}))
118+
} catch {
119+
case throwable: Throwable =>
120+
logWarning(s"Shuffle service cannot access Kubernetes. " +
121+
s"Orphaned file cleanup is disabled.", throwable)
122+
None
123+
}
124+
}
125+
126+
/** An extractor object for matching [[RegisterDriver]] message. */
127+
private object RegisterDriverParam {
128+
def unapply(r: RegisterDriver): Option[(String)] =
129+
Some(r.getAppId)
130+
}
131+
}
132+
133+
/**
134+
* A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
135+
* to associate with. This allows the shuffle service to detect when a driver is terminated
136+
* and can clean up the associated shuffle files.
137+
*/
138+
private[spark] class KubernetesExternalShuffleService(
139+
conf: SparkConf,
140+
securityManager: SecurityManager,
141+
kubernetesClientProvider: DriverPodKubernetesClientProvider)
142+
extends ExternalShuffleService(conf, securityManager) {
143+
144+
private var shuffleBlockHandlers: mutable.Buffer[KubernetesShuffleBlockHandler] = _
145+
protected override def newShuffleBlockHandler(
146+
tConf: TransportConf): ExternalShuffleBlockHandler = {
147+
val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClientProvider)
148+
newBlockHandler.start()
149+
150+
// TODO: figure out a better way of doing this.
151+
// This is necessary because the constructor is not called
152+
// when this class is initialized through ExternalShuffleService.
153+
if (shuffleBlockHandlers == null) {
154+
shuffleBlockHandlers = mutable.Buffer.empty[KubernetesShuffleBlockHandler]
155+
}
156+
shuffleBlockHandlers += newBlockHandler
157+
newBlockHandler
158+
}
159+
160+
override def stop(): Unit = {
161+
try {
162+
super.stop()
163+
} finally {
164+
shuffleBlockHandlers.foreach(_.close())
165+
}
166+
}
167+
}
168+
169+
private[spark] object KubernetesExternalShuffleService extends Logging {
170+
def main(args: Array[String]): Unit = {
171+
ExternalShuffleService.main(args,
172+
(conf: SparkConf, sm: SecurityManager) => {
173+
val kubernetesClientProvider = new DriverPodKubernetesClientProvider(conf)
174+
new KubernetesExternalShuffleService(conf, sm, kubernetesClientProvider)
175+
})
176+
}
177+
}
178+
179+

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package object constants {
2222
private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
2323
private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
2424
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
25+
private[spark] val SPARK_ROLE_LABEL = "spark-role"
2526

2627
// Credentials secrets
2728
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,13 @@ private[spark] class Client(
8282
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
8383
require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" +
8484
s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
85-
val allLabels = parsedCustomLabels ++
86-
Map(SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName)
85+
val allLabels = parsedCustomLabels ++ Map(
86+
SPARK_APP_ID_LABEL -> kubernetesAppId,
87+
SPARK_APP_NAME_LABEL -> appName,
88+
SPARK_ROLE_LABEL -> "driver")
8789
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
8890
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
91+
8992
Utils.tryWithResource(kubernetesClientProvider.get) { kubernetesClient =>
9093
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
9194
new EnvVarBuilder()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import org.apache.spark.deploy.kubernetes.config._
2929
import org.apache.spark.deploy.kubernetes.constants._
3030
import org.apache.spark.util.ThreadUtils
3131

32-
private[spark] class DriverPodKubernetesClientProvider(sparkConf: SparkConf, namespace: String) {
32+
private[spark] class DriverPodKubernetesClientProvider(
33+
sparkConf: SparkConf,
34+
namespace: Option[String] = None) {
35+
3336
private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)
3437
private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
3538
private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN)
@@ -45,7 +48,10 @@ private[spark] class DriverPodKubernetesClientProvider(sparkConf: SparkConf, nam
4548
val baseClientConfigBuilder = new ConfigBuilder()
4649
.withApiVersion("v1")
4750
.withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL)
48-
.withNamespace(namespace)
51+
52+
// Build a namespaced client if specified.
53+
val namespacedClientConfigBuilder = namespace
54+
.map(baseClientConfigBuilder.withNamespace(_)).getOrElse(baseClientConfigBuilder)
4955

5056
val configBuilder = oauthTokenFile
5157
.orElse(caCertFile)

0 commit comments

Comments
 (0)