Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c457f10

Browse files
kimoonkimash211
authored andcommitted
Support HDFS rack locality (#350)
* Support HDFS rack locality * Fix unit tests * Address review comments * Address some review comments * Use traits for InetAddress and RackResolver util classes * Disables expensive DNS lookup by default
1 parent 6e1d69e commit c457f10

File tree

7 files changed

+368
-14
lines changed

7 files changed

+368
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import java.net.InetAddress
20+
21+
/**
22+
* Gets full host names of given IP addresses from DNS.
23+
*/
24+
private[kubernetes] trait InetAddressUtil {
25+
26+
def getFullHostName(ipAddress: String): String
27+
}
28+
29+
private[kubernetes] object InetAddressUtilImpl extends InetAddressUtil {
30+
31+
// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
32+
// class for both hits and misses.
33+
override def getFullHostName(ipAddress: String): String = {
34+
InetAddress.getByName(ipAddress).getCanonicalHostName
35+
}
36+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,59 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19+
import org.apache.spark.deploy.kubernetes.config._
20+
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager}
21+
import org.apache.spark.util.Utils
1922
import org.apache.spark.SparkContext
20-
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}
2123

22-
private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
24+
private[spark] class KubernetesTaskSchedulerImpl(
25+
sc: SparkContext,
26+
rackResolverUtil: RackResolverUtil,
27+
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSchedulerImpl(sc) {
2328

29+
var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null
30+
31+
def this(sc: SparkContext) = this(sc, new RackResolverUtilImpl(sc.hadoopConfiguration))
32+
33+
override def initialize(backend: SchedulerBackend): Unit = {
34+
super.initialize(backend)
35+
kubernetesSchedulerBackend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend]
36+
}
2437
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
2538
new KubernetesTaskSetManager(this, taskSet, maxTaskFailures)
2639
}
40+
41+
override def getRackForHost(hostPort: String): Option[String] = {
42+
if (!rackResolverUtil.isConfigured) {
43+
// Only calls resolver when it is configured to avoid sending DNS queries for cluster nodes.
44+
// See InetAddressUtil for details.
45+
None
46+
} else {
47+
getRackForDatanodeOrExecutor(hostPort)
48+
}
49+
}
50+
51+
private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = {
52+
val host = Utils.parseHostPort(hostPort)._1
53+
val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host)
54+
val hadoopConfiguration = sc.hadoopConfiguration
55+
executorPod.map(
56+
pod => {
57+
val clusterNodeName = pod.getSpec.getNodeName
58+
val rackByNodeName = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeName)
59+
rackByNodeName.orElse({
60+
val clusterNodeIP = pod.getStatus.getHostIP
61+
val rackByNodeIP = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeIP)
62+
rackByNodeIP.orElse({
63+
if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) {
64+
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
65+
rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName)
66+
} else {
67+
Option.empty
68+
}
69+
})
70+
})
71+
}
72+
).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host))
73+
}
2774
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19-
import java.net.InetAddress
20-
2119
import scala.collection.mutable.ArrayBuffer
2220

2321
import org.apache.spark.deploy.kubernetes.config._
@@ -27,7 +25,7 @@ private[spark] class KubernetesTaskSetManager(
2725
sched: TaskSchedulerImpl,
2826
taskSet: TaskSet,
2927
maxTaskFailures: Int,
30-
inetAddressUtil: InetAddressUtil = new InetAddressUtil)
28+
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl)
3129
extends TaskSetManager(sched, taskSet, maxTaskFailures) {
3230

3331
private val conf = sched.sc.conf
@@ -83,12 +81,3 @@ private[spark] class KubernetesTaskSetManager(
8381
}
8482
}
8583

86-
// To support mocks in unit tests.
87-
private[kubernetes] class InetAddressUtil {
88-
89-
// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
90-
// class for both hits and misses.
91-
def getFullHostName(ipAddress: String): String = {
92-
InetAddress.getByName(ipAddress).getCanonicalHostName
93-
}
94-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
21+
import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping}
22+
import org.apache.hadoop.yarn.util.RackResolver
23+
import org.apache.log4j.{Level, Logger}
24+
25+
/**
26+
* Finds rack names that cluster nodes belong to in order to support HDFS rack locality.
27+
*/
28+
private[kubernetes] trait RackResolverUtil {
29+
30+
def isConfigured() : Boolean
31+
32+
def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String]
33+
}
34+
35+
private[kubernetes] class RackResolverUtilImpl(hadoopConfiguration: Configuration)
36+
extends RackResolverUtil {
37+
38+
val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName
39+
val tablePlugin : String = classOf[TableMapping].getCanonicalName
40+
val isResolverConfigured : Boolean = checkConfigured(hadoopConfiguration)
41+
42+
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
43+
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
44+
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
45+
}
46+
47+
override def isConfigured() : Boolean = isResolverConfigured
48+
49+
def checkConfigured(hadoopConfiguration: Configuration): Boolean = {
50+
val plugin = hadoopConfiguration.get(
51+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin)
52+
val scriptName = hadoopConfiguration.get(
53+
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "")
54+
val tableName = hadoopConfiguration.get(
55+
CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "")
56+
plugin == scriptPlugin && scriptName.nonEmpty ||
57+
plugin == tablePlugin && tableName.nonEmpty ||
58+
plugin != scriptPlugin && plugin != tablePlugin
59+
}
60+
61+
override def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = {
62+
val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation)
63+
if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) {
64+
rack
65+
} else {
66+
None
67+
}
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus}
20+
import org.mockito.Matchers._
21+
import org.mockito.Mockito._
22+
import org.apache.spark.{SparkContext, SparkFunSuite}
23+
import org.apache.spark.deploy.kubernetes.config._
24+
import org.apache.spark.scheduler.FakeTask
25+
import org.scalatest.BeforeAndAfter
26+
27+
class KubernetesTaskSchedulerImplSuite extends SparkFunSuite with BeforeAndAfter {
28+
29+
SparkContext.clearActiveContext()
30+
val sc = new SparkContext("local", "test")
31+
val backend = mock(classOf[KubernetesClusterSchedulerBackend])
32+
33+
before {
34+
sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)
35+
}
36+
37+
test("Create a k8s task set manager") {
38+
val sched = new KubernetesTaskSchedulerImpl(sc)
39+
sched.kubernetesSchedulerBackend = backend
40+
val taskSet = FakeTask.createTaskSet(0)
41+
42+
val manager = sched.createTaskSetManager(taskSet, maxTaskFailures = 3)
43+
assert(manager.isInstanceOf[KubernetesTaskSetManager])
44+
}
45+
46+
test("Gets racks for datanodes") {
47+
val rackResolverUtil = mock(classOf[RackResolverUtil])
48+
when(rackResolverUtil.isConfigured).thenReturn(true)
49+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
50+
.thenReturn(Option("/rack1"))
51+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
52+
.thenReturn(Option("/rack2"))
53+
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil)
54+
sched.kubernetesSchedulerBackend = backend
55+
when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None)
56+
when(backend.getExecutorPodByIP("kube-node2")).thenReturn(None)
57+
58+
assert(sched.getRackForHost("kube-node1:60010") == Option("/rack1"))
59+
assert(sched.getRackForHost("kube-node2:60010") == Option("/rack2"))
60+
}
61+
62+
test("Gets racks for executor pods") {
63+
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true)
64+
val rackResolverUtil = mock(classOf[RackResolverUtil])
65+
when(rackResolverUtil.isConfigured).thenReturn(true)
66+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
67+
.thenReturn(Option("/rack1"))
68+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com"))
69+
.thenReturn(Option("/rack2"))
70+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
71+
.thenReturn(None)
72+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5"))
73+
.thenReturn(None)
74+
val inetAddressUtil = mock(classOf[InetAddressUtil])
75+
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil)
76+
sched.kubernetesSchedulerBackend = backend
77+
78+
val spec1 = mock(classOf[PodSpec])
79+
when(spec1.getNodeName).thenReturn("kube-node1")
80+
val status1 = mock(classOf[PodStatus])
81+
when(status1.getHostIP).thenReturn("192.168.1.4")
82+
val pod1 = mock(classOf[Pod])
83+
when(pod1.getSpec).thenReturn(spec1)
84+
when(pod1.getStatus).thenReturn(status1)
85+
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))
86+
87+
val spec2 = mock(classOf[PodSpec])
88+
when(spec2.getNodeName).thenReturn("kube-node2")
89+
val status2 = mock(classOf[PodStatus])
90+
when(status2.getHostIP).thenReturn("192.168.1.5")
91+
val pod2 = mock(classOf[Pod])
92+
when(pod2.getSpec).thenReturn(spec2)
93+
when(pod2.getStatus).thenReturn(status2)
94+
when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com")
95+
when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2))
96+
97+
assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1"))
98+
assert(sched.getRackForHost("10.0.1.1:7079") == Option("/rack2"))
99+
100+
verify(inetAddressUtil, times(1)).getFullHostName(anyString())
101+
}
102+
103+
test("Gets racks for executor pods while disabling DNS lookup ") {
104+
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false)
105+
val rackResolverUtil = mock(classOf[RackResolverUtil])
106+
when(rackResolverUtil.isConfigured).thenReturn(true)
107+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
108+
.thenReturn(Option("/rack1"))
109+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com"))
110+
.thenReturn(Option("/rack2"))
111+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
112+
.thenReturn(None)
113+
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5"))
114+
.thenReturn(None)
115+
val inetAddressUtil = mock(classOf[InetAddressUtil])
116+
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil)
117+
sched.kubernetesSchedulerBackend = backend
118+
119+
val spec1 = mock(classOf[PodSpec])
120+
when(spec1.getNodeName).thenReturn("kube-node1")
121+
val status1 = mock(classOf[PodStatus])
122+
when(status1.getHostIP).thenReturn("192.168.1.4")
123+
val pod1 = mock(classOf[Pod])
124+
when(pod1.getSpec).thenReturn(spec1)
125+
when(pod1.getStatus).thenReturn(status1)
126+
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))
127+
128+
val spec2 = mock(classOf[PodSpec])
129+
when(spec2.getNodeName).thenReturn("kube-node2")
130+
val status2 = mock(classOf[PodStatus])
131+
when(status2.getHostIP).thenReturn("192.168.1.5")
132+
val pod2 = mock(classOf[Pod])
133+
when(pod2.getSpec).thenReturn(spec2)
134+
when(pod2.getStatus).thenReturn(status2)
135+
when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com")
136+
when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2))
137+
138+
assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1"))
139+
assert(sched.getRackForHost("10.0.1.1:7079") == None)
140+
141+
verify(inetAddressUtil, never).getFullHostName(anyString())
142+
}
143+
144+
test("Does not get racks if plugin is not configured") {
145+
val rackResolverUtil = mock(classOf[RackResolverUtil])
146+
when(rackResolverUtil.isConfigured()).thenReturn(false)
147+
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil)
148+
sched.kubernetesSchedulerBackend = backend
149+
when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None)
150+
151+
val spec1 = mock(classOf[PodSpec])
152+
when(spec1.getNodeName).thenReturn("kube-node1")
153+
val status1 = mock(classOf[PodStatus])
154+
when(status1.getHostIP).thenReturn("192.168.1.4")
155+
val pod1 = mock(classOf[Pod])
156+
when(pod1.getSpec).thenReturn(spec1)
157+
when(pod1.getStatus).thenReturn(status1)
158+
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))
159+
160+
assert(sched.getRackForHost("kube-node1:60010").isEmpty)
161+
assert(sched.getRackForHost("10.0.0.1:7079").isEmpty)
162+
}
163+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation
2828

2929
class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter {
3030

31+
SparkContext.clearActiveContext()
3132
val sc = new SparkContext("local", "test")
3233
val sched = new FakeTaskScheduler(sc,
3334
("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3"))

0 commit comments

Comments
 (0)