Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6882a1b

Browse files
lins05ash211
authored andcommitted
Exit properly when the k8s cluster is not available. (#256)
* Exit properly when the k8s cluster is not available. * add jetty to k8s module dependency so we can use only rebuild the k8s module. * CR * Fixed single thread scheduler. * Fixed scalastyle check. * CR
1 parent e071ad9 commit 6882a1b

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

resource-managers/kubernetes/core/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,3 @@
133133
</build>
134134

135135
</project>
136-

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ private[spark] class Client(
155155
.pods()
156156
.withName(kubernetesDriverPodName)
157157
.watch(loggingWatch)) { _ =>
158+
loggingWatch.start()
158159
val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() =>
159160
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
160161
val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.spark.internal.Logging
27+
import org.apache.spark.util.ThreadUtils
2728

2829
/**
2930
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
@@ -40,19 +41,23 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
4041
extends Watcher[Pod] with Logging {
4142

4243
// start timer for periodic logging
43-
private val scheduler = Executors.newScheduledThreadPool(1)
44+
private val scheduler =
45+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
4446
private val logRunnable: Runnable = new Runnable {
4547
override def run() = logShortStatus()
4648
}
47-
if (interval > 0) {
48-
scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
49-
}
5049

5150
private var pod: Option[Pod] = Option.empty
5251
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
5352
private def status: String = pod.map(_.getStatus().getContainerStatuses().toString())
5453
.getOrElse("unknown")
5554

55+
def start(): Unit = {
56+
if (interval > 0) {
57+
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
58+
}
59+
}
60+
5661
override def eventReceived(action: Action, pod: Pod): Unit = {
5762
this.pod = Option(pod)
5863
action match {

0 commit comments

Comments
 (0)