Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 546f09c

Browse files
kimoonkimash211
authored andcommitted
Dispatch tasks to right executors that have tasks' input HDFS data (#216)
* Dispatch tasks to right executors that have tasks' input HDFS data on local disks * Fix style issues * Clean up unnecessary fields * Clean up a misleading method name * Address review comments * Fix import ordering * Delete executor pods in watcher * Fix the driver hang by unblocking the main thread * Fix import order * Clear runningExecutorPods * Fix incorrect merge * Address review comments * Clean up imports
1 parent 26f747e commit 546f09c

File tree

6 files changed

+172
-12
lines changed

6 files changed

+172
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ private[spark] class TaskSetManager(
221221
* Return the pending tasks list for a given host, or an empty list if
222222
* there is no map entry for that host
223223
*/
224-
private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
224+
protected def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
225225
pendingTasksForHost.getOrElse(host, ArrayBuffer())
226226
}
227227

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ import java.io.File
2121
import com.google.common.base.Charsets
2222
import com.google.common.io.Files
2323
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient}
24+
import io.fabric8.kubernetes.client.utils.HttpClientUtils
25+
import okhttp3.Dispatcher
2426

2527
import org.apache.spark.SparkConf
2628
import org.apache.spark.deploy.kubernetes.config._
2729
import org.apache.spark.deploy.kubernetes.constants._
30+
import org.apache.spark.util.ThreadUtils
2831

2932
private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) {
3033
private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)
@@ -78,6 +81,17 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St
7881
}
7982
serviceAccountConfigBuilder
8083
}
81-
new DefaultKubernetesClient(configBuilder.build)
84+
// Disable the ping thread that is not daemon, in order to allow
85+
// the driver main thread to shut down upon errors. Otherwise, the driver
86+
// will hang indefinitely.
87+
val config = configBuilder
88+
.withWebsocketPingInterval(0)
89+
.build()
90+
val httpClient = HttpClientUtils.createHttpClient(config).newBuilder()
91+
// Use a Dispatcher with a custom executor service that creates daemon threads. The default
92+
// executor service used by Dispatcher creates non-daemon threads.
93+
.dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s")))
94+
.build()
95+
new DefaultKubernetesClient(httpClient, config)
8296
}
8397
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager {
2424
override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
2525

2626
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
27-
val scheduler = new TaskSchedulerImpl(sc)
27+
val scheduler = new KubernetesTaskSchedulerImpl(sc)
2828
sc.taskScheduler = scheduler
2929
scheduler
3030
}
@@ -37,6 +37,5 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager {
3737
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
3838
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
3939
}
40-
4140
}
4241

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19-
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
19+
import java.io.Closeable
20+
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
2021

21-
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder,
22-
EnvVarSourceBuilder, Pod, QuantityBuilder}
2322
import scala.collection.JavaConverters._
23+
import scala.collection.mutable
2424
import scala.concurrent.{ExecutionContext, Future}
2525

26+
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder,
27+
EnvVarSourceBuilder, Pod, QuantityBuilder}
28+
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
29+
import io.fabric8.kubernetes.client.Watcher.Action
30+
2631
import org.apache.spark.{SparkContext, SparkException}
2732
import org.apache.spark.deploy.kubernetes.config._
2833
import org.apache.spark.deploy.kubernetes.constants._
@@ -38,8 +43,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
3843

3944
import KubernetesClusterSchedulerBackend._
4045

41-
private val EXECUTOR_MODIFICATION_LOCK = new Object
42-
private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
46+
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
47+
private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs.
48+
49+
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
50+
private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs.
4351

4452
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
4553
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
@@ -87,6 +95,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
8795
super.minRegisteredRatio
8896
}
8997

98+
private val executorWatchResource = new AtomicReference[Closeable]
9099
protected var totalExpectedExecutors = new AtomicInteger(0)
91100

92101
private val driverUrl = RpcEndpointAddress(
@@ -119,6 +128,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
119128

120129
override def start(): Unit = {
121130
super.start()
131+
executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId())
132+
.watch(new ExecutorPodsWatcher()))
122133
if (!Utils.isDynamicAllocationEnabled(sc.conf)) {
123134
doRequestTotalExecutors(initialExecutors)
124135
}
@@ -133,11 +144,22 @@ private[spark] class KubernetesClusterSchedulerBackend(
133144
// When using Utils.tryLogNonFatalError some of the code fails but without any logs or
134145
// indication as to why.
135146
try {
136-
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
147+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
148+
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
149+
runningExecutorPods.clear()
150+
}
151+
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
152+
executorPodsByIPs.clear()
153+
}
154+
val resource = executorWatchResource.getAndSet(null)
155+
if (resource != null) {
156+
resource.close()
157+
}
137158
} catch {
138159
case e: Throwable => logError("Uncaught exception while shutting down controllers.", e)
139160
}
140161
try {
162+
logInfo("Closing kubernetes client")
141163
kubernetesClient.close()
142164
} catch {
143165
case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e)
@@ -231,7 +253,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
231253
}
232254

233255
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
234-
EXECUTOR_MODIFICATION_LOCK.synchronized {
256+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
235257
if (requestedTotal > totalExpectedExecutors.get) {
236258
logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}"
237259
+ s" additional executors, expecting total $requestedTotal and currently" +
@@ -246,7 +268,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
246268
}
247269

248270
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
249-
EXECUTOR_MODIFICATION_LOCK.synchronized {
271+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
250272
for (executor <- executorIds) {
251273
runningExecutorPods.remove(executor) match {
252274
case Some(pod) => kubernetesClient.pods().delete(pod)
@@ -256,6 +278,41 @@ private[spark] class KubernetesClusterSchedulerBackend(
256278
}
257279
true
258280
}
281+
282+
def getExecutorPodByIP(podIP: String): Option[Pod] = {
283+
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
284+
executorPodsByIPs.get(podIP)
285+
}
286+
}
287+
288+
private class ExecutorPodsWatcher extends Watcher[Pod] {
289+
290+
override def eventReceived(action: Action, pod: Pod): Unit = {
291+
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
292+
&& pod.getMetadata.getDeletionTimestamp == null) {
293+
val podIP = pod.getStatus.getPodIP
294+
val clusterNodeName = pod.getSpec.getNodeName
295+
logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
296+
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
297+
executorPodsByIPs += ((podIP, pod))
298+
}
299+
} else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) ||
300+
action == Action.DELETED || action == Action.ERROR) {
301+
val podName = pod.getMetadata.getName
302+
val podIP = pod.getStatus.getPodIP
303+
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
304+
if (podIP != null) {
305+
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
306+
executorPodsByIPs -= podIP
307+
}
308+
}
309+
}
310+
}
311+
312+
override def onClose(cause: KubernetesClientException): Unit = {
313+
logDebug("Executor pod watch closed.", cause)
314+
}
315+
}
259316
}
260317

261318
private object KubernetesClusterSchedulerBackend {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import org.apache.spark.SparkContext
20+
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}
21+
22+
private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
23+
24+
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
25+
new KubernetesTaskSetManager(this, taskSet, maxTaskFailures)
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import scala.collection.mutable.ArrayBuffer
20+
21+
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}
22+
23+
private[spark] class KubernetesTaskSetManager(
24+
sched: TaskSchedulerImpl,
25+
taskSet: TaskSet,
26+
maxTaskFailures: Int) extends TaskSetManager(sched, taskSet, maxTaskFailures) {
27+
28+
/**
29+
* Overrides the lookup to use not only the executor pod IP, but also the cluster node
30+
* name and host IP address that the pod is running on. The base class may have populated
31+
* the lookup target map with HDFS datanode locations if this task set reads HDFS data.
32+
* Those datanode locations are based on cluster node names or host IP addresses. Using
33+
* only executor pod IPs may not match them.
34+
*/
35+
override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = {
36+
val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP)
37+
if (pendingTasksExecutorIP.nonEmpty) {
38+
pendingTasksExecutorIP
39+
} else {
40+
val backend = sched.backend.asInstanceOf[KubernetesClusterSchedulerBackend]
41+
val pod = backend.getExecutorPodByIP(executorIP)
42+
if (pod.nonEmpty) {
43+
val clusterNodeName = pod.get.getSpec.getNodeName
44+
val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName)
45+
if (pendingTasksClusterNodeName.nonEmpty) {
46+
logDebug(s"Got preferred task list $pendingTasksClusterNodeName for executor host " +
47+
s"$executorIP using cluster node name $clusterNodeName")
48+
pendingTasksClusterNodeName
49+
} else {
50+
val clusterNodeIP = pod.get.getStatus.getHostIP
51+
val pendingTasksClusterNodeIP = super.getPendingTasksForHost(clusterNodeIP)
52+
if (pendingTasksClusterNodeIP.nonEmpty) {
53+
logDebug(s"Got preferred task list $pendingTasksClusterNodeIP for executor host " +
54+
s"$executorIP using cluster node IP $clusterNodeIP")
55+
}
56+
pendingTasksClusterNodeIP
57+
}
58+
} else {
59+
pendingTasksExecutorIP // Empty
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)