Skip to content

Commit 3711dbc

Browse files
committed
add safelogging
1 parent efbb2ff commit 3711dbc

File tree

3 files changed

+95
-16
lines changed

3 files changed

+95
-16
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal
19+
20+
trait SafeLogging extends Logging {
21+
// Method to get the logger name for this object
22+
override protected def logName = {
23+
// Ignore trailing $'s in the class names for Scala objects
24+
s"com.palantir.${this.getClass.getName.stripSuffix("$")}"
25+
}
26+
27+
protected def safeLogInfo(msg: => String) {
28+
if (log.isInfoEnabled) log.info(msg)
29+
}
30+
protected def safeLogDebug(msg: => String) {
31+
if (log.isDebugEnabled) log.debug(msg)
32+
}
33+
34+
protected def safeLogTrace(msg: => String) {
35+
if (log.isTraceEnabled) log.trace(msg)
36+
}
37+
38+
protected def safeLogWarning(msg: => String) {
39+
if (log.isWarnEnabled) log.warn(msg)
40+
}
41+
42+
protected def safeLogError(msg: => String) {
43+
if (log.isErrorEnabled) log.error(msg)
44+
}
45+
46+
47+
// You can only safelog to SafeLogging logs, everything else is unsupported
48+
override protected def logInfo(msg: => String) {
49+
if (log.isWarnEnabled) log.warn(msg)
50+
}
51+
override protected def logDebug(msg: => String) {
52+
throw new UnsupportedOperationException
53+
}
54+
override protected def logTrace(msg: => String) {
55+
throw new UnsupportedOperationException
56+
}
57+
override protected def logWarning(msg: => String) {
58+
throw new UnsupportedOperationException
59+
}
60+
override protected def logError(msg: => String) {
61+
throw new UnsupportedOperationException
62+
}
63+
64+
override protected def logInfo(msg: => String, throwable: Throwable) {
65+
throw new UnsupportedOperationException
66+
}
67+
override protected def logDebug(msg: => String, throwable: Throwable) {
68+
throw new UnsupportedOperationException
69+
}
70+
override protected def logTrace(msg: => String, throwable: Throwable) {
71+
throw new UnsupportedOperationException
72+
}
73+
override protected def logWarning(msg: => String, throwable: Throwable) {
74+
throw new UnsupportedOperationException
75+
}
76+
override protected def logError(msg: => String, throwable: Throwable) {
77+
throw new UnsupportedOperationException
78+
}
79+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ import org.apache.spark.{SparkConf, SparkException}
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
2828
import org.apache.spark.deploy.k8s.KubernetesConf
29-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.SafeLogging
3030
import org.apache.spark.util.{Clock, Utils}
3131

3232
private[spark] class ExecutorPodsAllocator(
3333
conf: SparkConf,
3434
executorBuilder: KubernetesExecutorBuilder,
3535
kubernetesClient: KubernetesClient,
3636
snapshotsStore: ExecutorPodsSnapshotsStore,
37-
clock: Clock) extends Logging {
37+
clock: Clock) extends SafeLogging {
3838

3939
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
4040

@@ -82,7 +82,7 @@ private[spark] class ExecutorPodsAllocator(
8282
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
8383
val currentTime = clock.getTimeMillis()
8484
if (currentTime - timeCreated > podCreationTimeout) {
85-
logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
85+
safeLogWarning(s"Executor with id $execId was not detected in the Kubernetes" +
8686
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
8787
" previous allocation attempt tried to create it. The executor may have been" +
8888
" deleted but the application missed the deletion event.")
@@ -94,7 +94,7 @@ private[spark] class ExecutorPodsAllocator(
9494
}
9595
newlyCreatedExecutors -= execId
9696
} else {
97-
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
97+
safeLogDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
9898
s" was created ${currentTime - timeCreated} milliseconds ago.")
9999
}
100100
}
@@ -112,15 +112,15 @@ private[spark] class ExecutorPodsAllocator(
112112
case _ => false
113113
}
114114
val currentTotalExpectedExecutors = totalExpectedExecutors.get
115-
logDebug(s"Currently have $currentRunningExecutors running executors and" +
115+
safeLogDebug(s"Currently have $currentRunningExecutors running executors and" +
116116
s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
117117
s" have been requested but are pending appearance in the cluster.")
118118
if (newlyCreatedExecutors.isEmpty
119119
&& currentPendingExecutors == 0
120120
&& currentRunningExecutors < currentTotalExpectedExecutors) {
121121
val numExecutorsToAllocate = math.min(
122122
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
123-
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
123+
safeLogInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
124124
for ( _ <- 0 until numExecutorsToAllocate) {
125125
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
126126
val executorConf = KubernetesConf.createExecutorConf(
@@ -136,14 +136,14 @@ private[spark] class ExecutorPodsAllocator(
136136
.build()
137137
kubernetesClient.pods().create(podWithAttachedContainer)
138138
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
139-
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
139+
safeLogDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
140140
}
141141
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
142142
// TODO handle edge cases if we end up with more running executors than expected.
143-
logDebug("Current number of running executors is equal to the number of requested" +
143+
safeLogDebug("Current number of running executors is equal to the number of requested" +
144144
" executors. Not scaling up further.")
145145
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
146-
logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
146+
safeLogDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
147147
s" executors to begin running before requesting for more executors. # of executors in" +
148148
s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
149149
s" created but we have not observed as being present in the cluster yet:" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable
2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.KubernetesUtils._
28-
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.SafeLogging
2929
import org.apache.spark.scheduler.ExecutorExited
3030
import org.apache.spark.util.Utils
3131

@@ -37,7 +37,7 @@ private[spark] class ExecutorPodsLifecycleManager(
3737
// job-breaking if we remove executors more than once but it's ideal if we make an attempt
3838
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
3939
// bounds.
40-
removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends Logging {
40+
removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends SafeLogging {
4141

4242
import ExecutorPodsLifecycleManager._
4343

@@ -57,16 +57,16 @@ private[spark] class ExecutorPodsLifecycleManager(
5757
snapshot.executorPods.foreach { case (execId, state) =>
5858
state match {
5959
case deleted@PodDeleted(_) =>
60-
logDebug(s"Snapshot reported deleted executor with id $execId," +
60+
safeLogDebug(s"Snapshot reported deleted executor with id $execId," +
6161
s" pod name ${state.pod.getMetadata.getName}")
6262
removeExecutorFromSpark(schedulerBackend, deleted, execId)
6363
execIdsRemovedInThisRound += execId
6464
case failed@PodFailed(_) =>
65-
logDebug(s"Snapshot reported failed executor with id $execId," +
65+
safeLogDebug(s"Snapshot reported failed executor with id $execId," +
6666
s" pod name ${state.pod.getMetadata.getName}")
6767
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
6868
case succeeded@PodSucceeded(_) =>
69-
logDebug(s"Snapshot reported succeeded executor with id $execId," +
69+
safeLogDebug(s"Snapshot reported succeeded executor with id $execId," +
7070
s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
7171
s" unusual unless Spark specifically informed the executor to exit.")
7272
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
@@ -89,7 +89,7 @@ private[spark] class ExecutorPodsLifecycleManager(
8989
val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
9090
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
9191
s" executor may have been deleted but the driver missed the deletion event."
92-
logDebug(exitReasonMessage)
92+
safeLogDebug(exitReasonMessage)
9393
val exitReason = ExecutorExited(
9494
UNKNOWN_EXIT_CODE,
9595
exitCausedByApp = false,
@@ -101,7 +101,7 @@ private[spark] class ExecutorPodsLifecycleManager(
101101
}
102102

103103
if (execIdsRemovedInThisRound.nonEmpty) {
104-
logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
104+
safeLogDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
105105
s" from Spark that were either found to be deleted or non-existent in the cluster.")
106106
}
107107
}

0 commit comments

Comments
 (0)