Skip to content

Commit 6975726

Browse files
authored
Merge pull request apache-spark-on-k8s#425 from palantir/yh/logging-v2
add safelogging
2 parents 9f68fed + 1369f8d commit 6975726

File tree

5 files changed

+127
-30
lines changed

5 files changed

+127
-30
lines changed

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@
416416
<scope>provided</scope>
417417
</dependency>
418418

419+
<dependency>
420+
<groupId>com.palantir.safe-logging</groupId>
421+
<artifactId>safe-logging</artifactId>
422+
<version>1.5.1</version>
423+
</dependency>
424+
419425
</dependencies>
420426
<build>
421427
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal
19+
20+
import com.palantir.logsafe.Arg
21+
import org.slf4j.LoggerFactory
22+
23+
trait SafeLogging {
24+
private[this] val log_ = LoggerFactory.getLogger(this.getClass.getName)
25+
26+
def safeLogInfo(message: String, args: Arg[_]*): Unit = {
27+
if (log_.isInfoEnabled) log_.info(message, args: _*)
28+
}
29+
30+
def safeLogInfo(message: String, error: Throwable, args: Arg[_]*): Unit = {
31+
if (log_.isInfoEnabled) log_.info(message, args :+ error: _*)
32+
}
33+
34+
def safeLogDebug(message: String, args: Arg[_]*): Unit = {
35+
if (log_.isDebugEnabled) log_.debug(message, args: _*)
36+
}
37+
38+
def safeLogDebug(message: String, error: Throwable, args: Arg[_]*): Unit = {
39+
if (log_.isDebugEnabled) log_.debug(message, args :+ error: _*)
40+
}
41+
42+
def safeLogTrace(message: String, args: Arg[_]*): Unit = {
43+
if (log_.isTraceEnabled) log_.trace(message, args: _*)
44+
}
45+
46+
def safeLogTrace(message: String, error: Throwable, args: Arg[_]*): Unit = {
47+
if (log_.isTraceEnabled) log_.trace(message, args :+ error: _*)
48+
}
49+
50+
def safeLogWarning(message: String, args: Arg[_]*): Unit = {
51+
if (log_.isWarnEnabled) log_.warn(message, args: _*)
52+
}
53+
54+
def safeLogWarning(message: String, error: Throwable, args: Arg[_]*): Unit = {
55+
if (log_.isWarnEnabled) log_.warn(message, args :+ error: _*)
56+
}
57+
58+
def safeLogError(message: String, args: Arg[_]*): Unit = {
59+
if (log_.isErrorEnabled) log_.error(message, args: _*)
60+
}
61+
62+
def safeLogError(message: String, error: Throwable, args: Arg[_]*): Unit = {
63+
if (log_.isErrorEnabled) log_.error(message, args :+ error: _*)
64+
}
65+
}
66+

dev/deps/spark-deps-hadoop-palantir

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ parquet-jackson-1.10.1-palantir.3.jar
176176
protobuf-java-2.5.0.jar
177177
py4j-0.10.7.jar
178178
pyrolite-4.13.jar
179+
safe-logging-1.5.1.jar
179180
scala-compiler-2.11.12.jar
180181
scala-library-2.11.12.jar
181182
scala-parser-combinators_2.11-1.1.0.jar

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
2020

21+
import com.palantir.logsafe.SafeArg
2122
import io.fabric8.kubernetes.api.model.PodBuilder
2223
import io.fabric8.kubernetes.client.KubernetesClient
2324
import scala.collection.mutable
@@ -26,15 +27,15 @@ import org.apache.spark.{SparkConf, SparkException}
2627
import org.apache.spark.deploy.k8s.Config._
2728
import org.apache.spark.deploy.k8s.Constants._
2829
import org.apache.spark.deploy.k8s.KubernetesConf
29-
import org.apache.spark.internal.Logging
30+
import org.apache.spark.internal.SafeLogging
3031
import org.apache.spark.util.{Clock, Utils}
3132

3233
private[spark] class ExecutorPodsAllocator(
3334
conf: SparkConf,
3435
executorBuilder: KubernetesExecutorBuilder,
3536
kubernetesClient: KubernetesClient,
3637
snapshotsStore: ExecutorPodsSnapshotsStore,
37-
clock: Clock) extends Logging {
38+
clock: Clock) extends SafeLogging {
3839

3940
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
4041

@@ -82,10 +83,12 @@ private[spark] class ExecutorPodsAllocator(
8283
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
8384
val currentTime = clock.getTimeMillis()
8485
if (currentTime - timeCreated > podCreationTimeout) {
85-
logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
86-
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
86+
safeLogWarning("Executor was not detected in the Kubernetes" +
87+
" cluster after timeout despite the fact that a" +
8788
" previous allocation attempt tried to create it. The executor may have been" +
88-
" deleted but the application missed the deletion event.")
89+
" deleted but the application missed the deletion event.",
90+
SafeArg.of("executorId", execId),
91+
SafeArg.of("podCreationTimeoutMs", podCreationTimeout))
8992
Utils.tryLogNonFatalError {
9093
kubernetesClient
9194
.pods()
@@ -94,8 +97,10 @@ private[spark] class ExecutorPodsAllocator(
9497
}
9598
newlyCreatedExecutors -= execId
9699
} else {
97-
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
98-
s" was created ${currentTime - timeCreated} milliseconds ago.")
100+
safeLogDebug("Executor was not found in the Kubernetes cluster since it" +
101+
" was created some time ago.",
102+
SafeArg.of("executorId", execId),
103+
SafeArg.of("timeSinceCreationMs", currentTime - timeCreated))
99104
}
100105
}
101106

@@ -112,15 +117,19 @@ private[spark] class ExecutorPodsAllocator(
112117
case _ => false
113118
}
114119
val currentTotalExpectedExecutors = totalExpectedExecutors.get
115-
logDebug(s"Currently have $currentRunningExecutors running executors and" +
116-
s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
117-
s" have been requested but are pending appearance in the cluster.")
120+
safeLogDebug("Currently have running executors and" +
121+
" pending executors. Newly created executors" +
122+
" have been requested but are pending appearance in the cluster.",
123+
SafeArg.of("numCurrentRunningExecutors", currentRunningExecutors),
124+
SafeArg.of("numCurrentPendingExecutors", currentPendingExecutors),
125+
SafeArg.of("newlyCreatedExecutors", newlyCreatedExecutors))
118126
if (newlyCreatedExecutors.isEmpty
119127
&& currentPendingExecutors == 0
120128
&& currentRunningExecutors < currentTotalExpectedExecutors) {
121129
val numExecutorsToAllocate = math.min(
122130
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
123-
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
131+
safeLogInfo("Going to request executors from Kubernetes.",
132+
SafeArg.of("numExecutorsToAllocate", numExecutorsToAllocate))
124133
for ( _ <- 0 until numExecutorsToAllocate) {
125134
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
126135
val executorConf = KubernetesConf.createExecutorConf(
@@ -136,18 +145,22 @@ private[spark] class ExecutorPodsAllocator(
136145
.build()
137146
kubernetesClient.pods().create(podWithAttachedContainer)
138147
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
139-
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
148+
safeLogDebug("Requested executor from Kubernetes.",
149+
SafeArg.of("newExecutorId", newExecutorId))
140150
}
141151
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
142152
// TODO handle edge cases if we end up with more running executors than expected.
143-
logDebug("Current number of running executors is equal to the number of requested" +
153+
safeLogDebug("Current number of running executors is equal to the number of requested" +
144154
" executors. Not scaling up further.")
145155
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
146-
logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
147-
s" executors to begin running before requesting for more executors. # of executors in" +
148-
s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
149-
s" created but we have not observed as being present in the cluster yet:" +
150-
s" ${newlyCreatedExecutors.size}.")
156+
safeLogDebug("Still waiting for" +
157+
" executors to begin running before requesting for more executors, including executors" +
158+
" in pending status in the cluster, and executors that we have" +
159+
" created but we have not observed as being present in the cluster yet.",
160+
SafeArg.of("numTotalCurrentWaitingExecutors",
161+
newlyCreatedExecutors.size + currentPendingExecutors),
162+
SafeArg.of("numCurrentPendingExecutors", currentPendingExecutors),
163+
SafeArg.of("numNewlyCreatedExecutors", newlyCreatedExecutors.size))
151164
}
152165
}
153166
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import com.google.common.cache.Cache
20+
import com.palantir.logsafe.SafeArg
2021
import io.fabric8.kubernetes.api.model.Pod
2122
import io.fabric8.kubernetes.client.KubernetesClient
2223
import scala.collection.JavaConverters._
@@ -25,7 +26,7 @@ import scala.collection.mutable
2526
import org.apache.spark.SparkConf
2627
import org.apache.spark.deploy.k8s.Config._
2728
import org.apache.spark.deploy.k8s.KubernetesUtils._
28-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.SafeLogging
2930
import org.apache.spark.scheduler.ExecutorExited
3031
import org.apache.spark.util.Utils
3132

@@ -37,7 +38,7 @@ private[spark] class ExecutorPodsLifecycleManager(
3738
// job-breaking if we remove executors more than once but it's ideal if we make an attempt
3839
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
3940
// bounds.
40-
removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends Logging {
41+
removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends SafeLogging {
4142

4243
import ExecutorPodsLifecycleManager._
4344

@@ -57,18 +58,24 @@ private[spark] class ExecutorPodsLifecycleManager(
5758
snapshot.executorPods.foreach { case (execId, state) =>
5859
state match {
5960
case deleted@PodDeleted(_) =>
60-
logDebug(s"Snapshot reported deleted executor with id $execId," +
61-
s" pod name ${state.pod.getMetadata.getName}")
61+
safeLogDebug(
62+
"Snapshot reported deleted executor",
63+
SafeArg.of("executorId", execId),
64+
SafeArg.of("podName", state.pod.getMetadata.getName))
6265
removeExecutorFromSpark(schedulerBackend, deleted, execId)
6366
execIdsRemovedInThisRound += execId
6467
case failed@PodFailed(_) =>
65-
logDebug(s"Snapshot reported failed executor with id $execId," +
66-
s" pod name ${state.pod.getMetadata.getName}")
68+
safeLogDebug(
69+
"Snapshot reported failed executor",
70+
SafeArg.of("executorId", execId),
71+
SafeArg.of("podName", state.pod.getMetadata.getName))
6772
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
6873
case succeeded@PodSucceeded(_) =>
69-
logDebug(s"Snapshot reported succeeded executor with id $execId," +
70-
s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
71-
s" unusual unless Spark specifically informed the executor to exit.")
74+
safeLogDebug(
75+
"Snapshot reported succeeded executor." +
76+
" Note that unusual unless Spark specifically informed the executor to exit.",
77+
SafeArg.of("executorId", execId),
78+
SafeArg.of("podName", state.pod.getMetadata.getName))
7279
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
7380
case _ =>
7481
}
@@ -89,7 +96,10 @@ private[spark] class ExecutorPodsLifecycleManager(
8996
val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
9097
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
9198
s" executor may have been deleted but the driver missed the deletion event."
92-
logDebug(exitReasonMessage)
99+
safeLogDebug("The executor was not found in the" +
100+
" cluster but we didn't get a reason why. Marking the executor as failed. The" +
101+
" executor may have been deleted but the driver missed the deletion event.",
102+
SafeArg.of("missingExecutorId", missingExecutorId))
93103
val exitReason = ExecutorExited(
94104
UNKNOWN_EXIT_CODE,
95105
exitCausedByApp = false,
@@ -101,8 +111,9 @@ private[spark] class ExecutorPodsLifecycleManager(
101111
}
102112

103113
if (execIdsRemovedInThisRound.nonEmpty) {
104-
logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
105-
s" from Spark that were either found to be deleted or non-existent in the cluster.")
114+
safeLogDebug("Removed executors" +
115+
" from Spark that were either found to be deleted or non-existent in the cluster.",
116+
SafeArg.of("executorIdsRemovedInThisRound", execIdsRemovedInThisRound.mkString(",")))
106117
}
107118
}
108119

0 commit comments

Comments
 (0)