Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2303aad

Browse files
foxishash211
authored andcommitted
Richer logging and better error handling in driver pod watch (#154)
* pod-watch progress around watch events * Simplify return * comments
1 parent 39c2cf2 commit 2303aad

File tree

1 file changed

+28
-15
lines changed

1 file changed

+28
-15
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,35 +50,47 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
5050
}
5151

5252
private var pod: Option[Pod] = Option.empty
53-
private var prevPhase: String = null
5453
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
54+
private def status: String = pod.map(_.getStatus().getContainerStatuses().toString())
55+
.getOrElse("unknown")
5556

5657
override def eventReceived(action: Action, pod: Pod): Unit = {
5758
this.pod = Option(pod)
58-
59-
logShortStatus()
60-
if (prevPhase != phase) {
61-
logLongStatus()
62-
}
63-
prevPhase = phase
64-
65-
if (phase == "Succeeded" || phase == "Failed") {
66-
podCompletedFuture.countDown()
67-
scheduler.shutdown()
59+
action match {
60+
case Action.DELETED =>
61+
closeWatch()
62+
63+
case Action.ERROR =>
64+
closeWatch()
65+
66+
case _ =>
67+
logLongStatus()
68+
if (hasCompleted()) {
69+
closeWatch()
70+
}
6871
}
6972
}
7073

7174
override def onClose(e: KubernetesClientException): Unit = {
72-
scheduler.shutdown()
73-
logDebug(s"Stopped watching application $appId with last-observed phase $phase")
75+
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
76+
closeWatch()
7477
}
7578

7679
private def logShortStatus() = {
7780
logInfo(s"Application status for $appId (phase: $phase)")
7881
}
7982

8083
private def logLongStatus() = {
81-
logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown"))
84+
logInfo("State changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown"))
85+
}
86+
87+
private def hasCompleted(): Boolean = {
88+
phase == "Succeeded" || phase == "Failed"
89+
}
90+
91+
private def closeWatch(): Unit = {
92+
podCompletedFuture.countDown()
93+
scheduler.shutdown()
8294
}
8395

8496
private def formatPodState(pod: Pod): String = {
@@ -103,7 +115,8 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
103115
.asScala
104116
.map(_.getImage)
105117
.mkString(", ")),
106-
("phase", pod.getStatus.getPhase())
118+
("phase", pod.getStatus.getPhase()),
119+
("status", pod.getStatus.getContainerStatuses().toString)
107120
)
108121

109122
// Use more loggable format if value is null or empty

0 commit comments

Comments
 (0)