Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2fb596d

Browse files
committed
Address CR comments.
1 parent c565c9f commit 2fb596d

File tree

2 files changed

+17
-22
lines changed

2 files changed

+17
-22
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ package object config extends Logging {
9393

9494
private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE =
9595
ConfigBuilder("spark.kubernetes.allocation.batch.size")
96-
.doc("Number of pods to launch at once in each round of dynamic allocation. ")
96+
.doc("Number of pods to launch at once in each round of executor allocation.")
9797
.intConf
9898
.createWithDefault(5)
9999

100100
private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
101101
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
102-
.doc("Number of seconds to wait between each round of executor allocation. ")
102+
.doc("Number of seconds to wait between each round of executor allocation.")
103103
.longConf
104104
.createWithDefault(1)
105105

@@ -129,7 +129,7 @@ package object config extends Logging {
129129
masterWithoutK8sPrefix
130130
} else {
131131
val resolvedURL = s"https://$masterWithoutK8sPrefix"
132-
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
132+
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
133133
s" URL is $resolvedURL")
134134
resolvedURL
135135
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
6666
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
6767
requestExecutorsService)
6868

69-
private val driverPod = try {
70-
kubernetesClient.pods()
71-
.inNamespace(kubernetesNamespace)
72-
.withName(kubernetesDriverPodName)
73-
.get()
74-
} catch {
75-
case throwable: Throwable =>
76-
logError(s"Executor cannot find driver pod.", throwable)
77-
throw new SparkException(s"Executor cannot find driver pod", throwable)
78-
}
69+
private val driverPod = kubernetesClient.pods()
70+
.inNamespace(kubernetesNamespace)
71+
.withName(kubernetesDriverPodName)
72+
.get()
7973

8074
override val minRegisteredRatio =
8175
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -142,13 +136,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
142136
knownExitReason.fold {
143137
removeExecutorOrIncrementLossReasonCheckCount(executorId)
144138
} { executorExited =>
145-
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message)
139+
logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
146140
removeExecutor(executorId, executorExited)
147141
// We keep around executors that have exit conditions caused by the application. This
148142
// allows them to be debugged later on. Otherwise, mark them as to be deleted from the
149143
// the API server.
150144
if (!executorExited.exitCausedByApp) {
145+
logInfo(s"Executor $executorId failed because of a framework error.")
151146
deleteExecutorFromClusterAndDataStructures(executorId)
147+
} else {
148+
logInfo(s"Executor $executorId exited because of the application.")
152149
}
153150
}
154151
}
@@ -192,8 +189,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
192189

193190
}
194191

195-
override def applicationId(): String = conf.get("spark.app.id", super.applicationId())
196-
197192
override def sufficientResourcesRegistered(): Boolean = {
198193
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
199194
}
@@ -331,10 +326,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
331326

332327
override def eventReceived(action: Action, pod: Pod): Unit = {
333328
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
334-
&& pod.getMetadata.getDeletionTimestamp == null) {
329+
&& pod.getMetadata.getDeletionTimestamp == null) {
335330
val podIP = pod.getStatus.getPodIP
336331
val clusterNodeName = pod.getSpec.getNodeName
337-
logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
332+
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
338333
executorPodsByIPs.put(podIP, pod)
339334
} else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) ||
340335
action == Action.DELETED || action == Action.ERROR) {
@@ -345,10 +340,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
345340
executorPodsByIPs.remove(podIP)
346341
}
347342
if (action == Action.ERROR) {
348-
logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
343+
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
349344
handleErroredPod(pod)
350345
} else if (action == Action.DELETED) {
351-
logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
346+
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
352347
handleDeletedPod(pod)
353348
}
354349
}
@@ -386,8 +381,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
386381
// container was probably actively killed by the driver.
387382
val exitReason = if (isPodAlreadyReleased(pod)) {
388383
ExecutorExited(containerExitStatus, exitCausedByApp = false,
389-
s"Container in pod " + pod.getMetadata.getName +
390-
" exited from explicit termination request.")
384+
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination" +
385+
" request.")
391386
} else {
392387
val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
393388
s"exited with exit status code $containerExitStatus."

0 commit comments

Comments
 (0)