Skip to content

Commit c0e3a5e

Browse files
committed
Extract more of the shuffle management to a different class. (apache-spark-on-k8s#454)
* Extract more of the shuffle management to a different class. More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle. * Fix scalastyle * Add override annotation * Fix Java style * Remove unused imports. * Move volume index to the beginning to satisfy index * Address PR comments.
1 parent f28cb17 commit c0e3a5e

File tree

8 files changed

+286
-262
lines changed

8 files changed

+286
-262
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,64 +17,13 @@
1717

1818
package org.apache.spark.network.shuffle.kubernetes;
1919

20-
import org.apache.spark.network.client.RpcResponseCallback;
21-
import org.apache.spark.network.client.TransportClient;
22-
import org.apache.spark.network.sasl.SecretKeyHolder;
23-
import org.apache.spark.network.shuffle.ExternalShuffleClient;
24-
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
25-
import org.apache.spark.network.util.TransportConf;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
28-
20+
import java.io.Closeable;
2921
import java.io.IOException;
30-
import java.nio.ByteBuffer;
31-
32-
/**
33-
* A client for talking to the external shuffle service in Kubernetes cluster mode.
34-
*
35-
* This is used by the each Spark executor to register with a corresponding external
36-
* shuffle service on the cluster. The purpose is for cleaning up shuffle files
37-
* reliably if the application exits unexpectedly.
38-
*/
39-
public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
40-
private static final Logger logger = LoggerFactory
41-
.getLogger(KubernetesExternalShuffleClient.class);
42-
43-
/**
44-
* Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
45-
* Please refer to docs on {@link ExternalShuffleClient} for more information.
46-
*/
47-
public KubernetesExternalShuffleClient(
48-
TransportConf conf,
49-
SecretKeyHolder secretKeyHolder,
50-
boolean authEnabled,
51-
long registrationTimeoutMs) {
52-
super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
53-
}
54-
55-
public void registerDriverWithShuffleService(String host, int port)
56-
throws IOException, InterruptedException {
57-
checkInit();
58-
ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer();
59-
TransportClient client = clientFactory.createClient(host, port);
60-
client.sendRpc(registerDriver, new RegisterDriverCallback());
61-
}
6222

63-
private class RegisterDriverCallback implements RpcResponseCallback {
64-
@Override
65-
public void onSuccess(ByteBuffer response) {
66-
logger.info("Successfully registered app " + appId + " with external shuffle service.");
67-
}
23+
public interface KubernetesExternalShuffleClient extends Closeable {
6824

69-
@Override
70-
public void onFailure(Throwable e) {
71-
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
72-
"Please manually remove shuffle data after driver exit. Error: " + e);
73-
}
74-
}
25+
void init(String appId);
7526

76-
@Override
77-
public void close() {
78-
super.close();
79-
}
27+
void registerDriverWithShuffleService(String host, int port)
28+
throws IOException, InterruptedException;
8029
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.kubernetes;
19+
20+
import org.apache.spark.network.client.RpcResponseCallback;
21+
import org.apache.spark.network.client.TransportClient;
22+
import org.apache.spark.network.sasl.SecretKeyHolder;
23+
import org.apache.spark.network.shuffle.ExternalShuffleClient;
24+
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
25+
import org.apache.spark.network.util.TransportConf;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
32+
/**
33+
* A client for talking to the external shuffle service in Kubernetes cluster mode.
34+
*
35+
* This is used by the each Spark executor to register with a corresponding external
36+
* shuffle service on the cluster. The purpose is for cleaning up shuffle files
37+
* reliably if the application exits unexpectedly.
38+
*/
39+
public class KubernetesExternalShuffleClientImpl
40+
extends ExternalShuffleClient implements KubernetesExternalShuffleClient {
41+
42+
private static final Logger logger = LoggerFactory
43+
.getLogger(KubernetesExternalShuffleClientImpl.class);
44+
45+
/**
46+
* Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
47+
* Please refer to docs on {@link ExternalShuffleClient} for more information.
48+
*/
49+
public KubernetesExternalShuffleClientImpl(
50+
TransportConf conf,
51+
SecretKeyHolder secretKeyHolder,
52+
boolean saslEnabled) {
53+
super(conf, secretKeyHolder, saslEnabled);
54+
}
55+
56+
@Override
57+
public void registerDriverWithShuffleService(String host, int port)
58+
throws IOException, InterruptedException {
59+
checkInit();
60+
ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer();
61+
TransportClient client = clientFactory.createClient(host, port);
62+
client.sendRpc(registerDriver, new RegisterDriverCallback());
63+
}
64+
65+
private class RegisterDriverCallback implements RpcResponseCallback {
66+
@Override
67+
public void onSuccess(ByteBuffer response) {
68+
logger.info("Successfully registered app " + appId + " with external shuffle service.");
69+
}
70+
71+
@Override
72+
public void onFailure(Throwable e) {
73+
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
74+
"Please manually remove shuffle data after driver exit. Error: " + e);
75+
}
76+
}
77+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ private[spark] trait ExecutorPodFactory {
3636
applicationId: String,
3737
driverUrl: String,
3838
executorEnvs: Seq[(String, String)],
39-
shuffleServiceConfig: Option[ShuffleServiceConfig],
4039
driverPod: Pod,
4140
nodeToLocalTaskCount: Map[String, Int]): Pod
4241
}
@@ -47,7 +46,8 @@ private[spark] class ExecutorPodFactoryImpl(
4746
mountSecretsBootstrap: Option[MountSecretsBootstrap],
4847
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
4948
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
50-
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
49+
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
50+
shuffleManager: Option[KubernetesExternalShuffleManager])
5151
extends ExecutorPodFactory {
5252

5353
import ExecutorPodFactoryImpl._
@@ -111,7 +111,6 @@ private[spark] class ExecutorPodFactoryImpl(
111111
applicationId: String,
112112
driverUrl: String,
113113
executorEnvs: Seq[(String, String)],
114-
shuffleServiceConfig: Option[ShuffleServiceConfig],
115114
driverPod: Pod,
116115
nodeToLocalTaskCount: Map[String, Int]): Pod = {
117116
val name = s"$executorPodNamePrefix-exec-$executorId"
@@ -179,6 +178,9 @@ private[spark] class ExecutorPodFactoryImpl(
179178
.withContainerPort(port._2)
180179
.build()
181180
})
181+
val shuffleVolumesWithMounts =
182+
shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts)
183+
.getOrElse(Seq.empty)
182184

183185
val executorContainer = new ContainerBuilder()
184186
.withName(s"executor")
@@ -191,6 +193,7 @@ private[spark] class ExecutorPodFactoryImpl(
191193
.endResources()
192194
.addAllToEnv(executorEnv.asJava)
193195
.withPorts(requiredPorts.asJava)
196+
.addAllToVolumeMounts(shuffleVolumesWithMounts.map(_._2).asJava)
194197
.build()
195198

196199
val executorPod = new PodBuilder()
@@ -211,6 +214,7 @@ private[spark] class ExecutorPodFactoryImpl(
211214
.withHostname(hostname)
212215
.withRestartPolicy("Never")
213216
.withNodeSelector(nodeSelector.asJava)
217+
.addAllToVolumes(shuffleVolumesWithMounts.map(_._1).asJava)
214218
.endSpec()
215219
.build()
216220

@@ -226,42 +230,15 @@ private[spark] class ExecutorPodFactoryImpl(
226230
.build()
227231
}.getOrElse(executorContainer)
228232

229-
val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config =>
230-
config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) =>
231-
new ContainerBuilder(container)
232-
.addNewVolumeMount()
233-
.withName(FilenameUtils.getBaseName(dir))
234-
.withMountPath(dir)
235-
.endVolumeMount()
236-
.build()
237-
}
238-
}.getOrElse(containerWithExecutorLimitCores)
239-
val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config =>
240-
config.shuffleDirs.foldLeft(executorPod) { (builder, dir) =>
241-
new PodBuilder(builder)
242-
.editSpec()
243-
.addNewVolume()
244-
.withName(FilenameUtils.getBaseName(dir))
245-
.withNewHostPath()
246-
.withPath(dir)
247-
.endHostPath()
248-
.endVolume()
249-
.endSpec()
250-
.build()
251-
}
252-
}.getOrElse(executorPod)
253-
254233
val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
255234
mountSecretsBootstrap.map {bootstrap =>
256-
bootstrap.mountSecrets(withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
257-
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
258-
235+
bootstrap.mountSecrets(executorPod, containerWithExecutorLimitCores)
236+
}.getOrElse((executorPod, containerWithExecutorLimitCores))
259237
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
260238
mountSmallFilesBootstrap.map { bootstrap =>
261239
bootstrap.mountSmallFilesSecret(
262240
withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
263241
}.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))
264-
265242
val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
266243
executorInitContainerBootstrap.map { bootstrap =>
267244
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ import org.apache.spark.deploy.kubernetes.config._
2626
import org.apache.spark.deploy.kubernetes.constants._
2727
import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
2828
import org.apache.spark.internal.Logging
29+
import org.apache.spark.network.netty.SparkTransportConf
30+
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
2931
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
32+
import org.apache.spark.util.Utils
3033

3134
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
3235

@@ -109,17 +112,31 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
109112
sparkConf,
110113
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
111114
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
115+
116+
val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) {
117+
val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl(
118+
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
119+
sc.env.securityManager,
120+
sc.env.securityManager.isAuthenticationEnabled())
121+
Some(new KubernetesExternalShuffleManagerImpl(
122+
sparkConf,
123+
kubernetesClient,
124+
kubernetesExternalShuffleClient))
125+
} else None
126+
112127
val executorPodFactory = new ExecutorPodFactoryImpl(
113128
sparkConf,
114129
NodeAffinityExecutorPodModifierImpl,
115130
mountSecretBootstrap,
116131
mountSmallFilesBootstrap,
117132
executorInitContainerBootstrap,
118-
executorInitContainerSecretVolumePlugin)
133+
executorInitContainerSecretVolumePlugin,
134+
kubernetesShuffleManager)
119135
new KubernetesClusterSchedulerBackend(
120136
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
121137
sc,
122138
executorPodFactory,
139+
kubernetesShuffleManager,
123140
kubernetesClient)
124141
}
125142

0 commit comments

Comments
 (0)