Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit e3cfaa4

Browse files
kimoonkimerikerlandson
authored andcommitted
Flag-guard expensive DNS lookup of cluster node full names, part of HDFS locality support (#412)
* Flag-guard expensive DNS lookup of cluster node full names, part of HDFS locality support * Clean up a bit * Improve unit tests
1 parent 5fdaa7f commit e3cfaa4

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,19 @@ package object config extends Logging {
491491
.stringConf
492492
.createOptional
493493

494+
private[spark] val KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED =
495+
ConfigBuilder("spark.kubernetes.driver.hdfslocality.clusterNodeNameDNSLookup.enabled")
496+
.doc("Whether or not HDFS locality support code should look up DNS for full hostnames of" +
497+
" cluster nodes. In some K8s clusters, notably GKE, cluster node names are short" +
498+
" hostnames, and so comparing them against HDFS datanode hostnames always fail. To fix," +
499+
" enable this flag. This is disabled by default because DNS lookup can be expensive." +
500+
" The driver can slow down and fail to respond to executor heartbeats in time." +
501+
" If enabling this flag, make sure your DNS server has enough capacity" +
502+
" for the workload.")
503+
.internal()
504+
.booleanConf
505+
.createWithDefault(false)
506+
494507
private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
495508
ConfigBuilder("spark.kubernetes.executor.limit.cores")
496509
.doc("Specify the hard cpu limit for a single executor pod")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.net.InetAddress
2020

2121
import scala.collection.mutable.ArrayBuffer
2222

23+
import org.apache.spark.deploy.kubernetes.config._
2324
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}
2425

2526
private[spark] class KubernetesTaskSetManager(
@@ -29,6 +30,8 @@ private[spark] class KubernetesTaskSetManager(
2930
inetAddressUtil: InetAddressUtil = new InetAddressUtil)
3031
extends TaskSetManager(sched, taskSet, maxTaskFailures) {
3132

33+
private val conf = sched.sc.conf
34+
3235
/**
3336
* Overrides the lookup to use not only the executor pod IP, but also the cluster node
3437
* name and host IP address that the pod is running on. The base class may have populated
@@ -58,13 +61,19 @@ private[spark] class KubernetesTaskSetManager(
5861
s"$executorIP using cluster node IP $clusterNodeIP")
5962
pendingTasksClusterNodeIP
6063
} else {
61-
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
62-
val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(clusterNodeFullName)
63-
if (pendingTasksClusterNodeFullName.nonEmpty) {
64-
logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " +
65-
s"for executor host $executorIP using cluster node full name $clusterNodeFullName")
64+
if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) {
65+
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
66+
val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(
67+
clusterNodeFullName)
68+
if (pendingTasksClusterNodeFullName.nonEmpty) {
69+
logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " +
70+
s"for executor host $executorIP using cluster node full name " +
71+
s"$clusterNodeFullName")
72+
}
73+
pendingTasksClusterNodeFullName
74+
} else {
75+
pendingTasksExecutorIP // Empty
6676
}
67-
pendingTasksClusterNodeFullName
6877
}
6978
}
7079
} else {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,24 @@ import scala.collection.mutable.ArrayBuffer
2020

2121
import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus}
2222
import org.mockito.Mockito._
23+
import org.scalatest.BeforeAndAfter
2324

2425
import org.apache.spark.{SparkContext, SparkFunSuite}
26+
import org.apache.spark.deploy.kubernetes.config._
2527
import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation}
2628

27-
class KubernetesTaskSetManagerSuite extends SparkFunSuite {
29+
class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter {
2830

2931
val sc = new SparkContext("local", "test")
3032
val sched = new FakeTaskScheduler(sc,
3133
("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3"))
3234
val backend = mock(classOf[KubernetesClusterSchedulerBackend])
3335
sched.backend = backend
3436

37+
before {
38+
sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)
39+
}
40+
3541
test("Find pending tasks for executors using executor pod IP addresses") {
3642
val taskSet = FakeTask.createTaskSet(3,
3743
Seq(TaskLocation("10.0.0.1", "execA")), // Task 0 runs on executor pod 10.0.0.1.
@@ -76,7 +82,33 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite {
7682
assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer(1, 0))
7783
}
7884

85+
test("Test DNS lookup is disabled by default for cluster node full hostnames") {
86+
assert(!sc.conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED))
87+
}
88+
89+
test("Find pending tasks for executors, but avoid looking up cluster node FQDNs from DNS") {
90+
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false)
91+
val taskSet = FakeTask.createTaskSet(2,
92+
Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here.
93+
Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here.
94+
)
95+
val spec1 = mock(classOf[PodSpec])
96+
when(spec1.getNodeName).thenReturn("kube-node1")
97+
val pod1 = mock(classOf[Pod])
98+
when(pod1.getSpec).thenReturn(spec1)
99+
val status1 = mock(classOf[PodStatus])
100+
when(status1.getHostIP).thenReturn("196.0.0.5")
101+
when(pod1.getStatus).thenReturn(status1)
102+
val inetAddressUtil = mock(classOf[InetAddressUtil])
103+
when(inetAddressUtil.getFullHostName("196.0.0.5")).thenReturn("kube-node1.domain1")
104+
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))
105+
106+
val manager = new KubernetesTaskSetManager(sched, taskSet, maxTaskFailures = 2, inetAddressUtil)
107+
assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer())
108+
}
109+
79110
test("Find pending tasks for executors using cluster node FQDNs that executor pods run on") {
111+
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true)
80112
val taskSet = FakeTask.createTaskSet(2,
81113
Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here.
82114
Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here.

0 commit comments

Comments
 (0)