Skip to content

Commit 9a98bd4

Browse files
committed
use safelogging?
1 parent 3711dbc commit 9a98bd4

File tree

5 files changed

+123
-106
lines changed

5 files changed

+123
-106
lines changed

core/src/main/scala/org/apache/spark/internal/SafeLogging.scala

Lines changed: 0 additions & 79 deletions
This file was deleted.

resource-managers/kubernetes/core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@
8989
<version>3.8.1</version>
9090
</dependency>
9191

92+
<dependency>
93+
<groupId>com.palantir.safe-logging</groupId>
94+
<artifactId>safe-logging</artifactId>
95+
<version>1.5.1</version>
96+
</dependency>
97+
9298
<dependency>
9399
<groupId>org.mockito</groupId>
94100
<artifactId>mockito-core</artifactId>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.logging
19+
20+
import com.palantir.logsafe.Arg
21+
import org.slf4j.LoggerFactory
22+
23+
trait SafeLogging {
24+
private[this] val log_ = LoggerFactory.getLogger(this.getClass.getName)
25+
26+
def safeLogInfo(message: String, args: Arg[_]*): Unit = {
27+
if (log_.isInfoEnabled) log_.info(message, args: _*)
28+
}
29+
30+
def safeLogInfo(message: String, error: Throwable, args: Arg[_]*): Unit = {
31+
if (log_.isInfoEnabled) log_.info(message, error, args: _*)
32+
}
33+
34+
def safeLogDebug(message: String, args: Arg[_]*): Unit = {
35+
if (log_.isDebugEnabled) log_.debug(message, args: _*)
36+
}
37+
38+
def safeLogDebug(message: String, error: Throwable, args: Arg[_]*): Unit = {
39+
if (log_.isDebugEnabled) log_.debug(message, error, args: _*)
40+
}
41+
42+
def safeLogTrace(message: String, args: Arg[_]*): Unit = {
43+
if (log_.isTraceEnabled) log_.trace(message, args: _*)
44+
}
45+
46+
def safeLogTrace(message: String, error: Throwable, args: Arg[_]*): Unit = {
47+
if (log_.isTraceEnabled) log_.trace(message, error, args: _*)
48+
}
49+
50+
def safeLogWarning(message: String, args: Arg[_]*): Unit = {
51+
if (log_.isWarnEnabled) log_.warn(message, args: _*)
52+
}
53+
54+
def safeLogWarning(message: String, error: Throwable, args: Arg[_]*): Unit = {
55+
if (log_.isWarnEnabled) log_.warn(message, error, args: _*)
56+
}
57+
58+
def safeLogError(message: String, args: Arg[_]*): Unit = {
59+
if (log_.isErrorEnabled) log_.error(message, args: _*)
60+
}
61+
62+
def safeLogError(message: String, error: Throwable, args: Arg[_]*): Unit = {
63+
if (log_.isErrorEnabled) log_.error(message, error, args: _*)
64+
}
65+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
2020

21+
import com.palantir.logsafe.SafeArg
2122
import io.fabric8.kubernetes.api.model.PodBuilder
2223
import io.fabric8.kubernetes.client.KubernetesClient
2324
import scala.collection.mutable
@@ -26,7 +27,7 @@ import org.apache.spark.{SparkConf, SparkException}
2627
import org.apache.spark.deploy.k8s.Config._
2728
import org.apache.spark.deploy.k8s.Constants._
2829
import org.apache.spark.deploy.k8s.KubernetesConf
29-
import org.apache.spark.internal.SafeLogging
30+
import org.apache.spark.logging.SafeLogging
3031
import org.apache.spark.util.{Clock, Utils}
3132

3233
private[spark] class ExecutorPodsAllocator(
@@ -82,10 +83,12 @@ private[spark] class ExecutorPodsAllocator(
8283
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
8384
val currentTime = clock.getTimeMillis()
8485
if (currentTime - timeCreated > podCreationTimeout) {
85-
safeLogWarning(s"Executor with id $execId was not detected in the Kubernetes" +
86-
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
86+
safeLogWarning("Executor with id {execId} was not detected in the Kubernetes" +
87+
" cluster after {podCreationTimeout} milliseconds despite the fact that a" +
8788
" previous allocation attempt tried to create it. The executor may have been" +
88-
" deleted but the application missed the deletion event.")
89+
" deleted but the application missed the deletion event.",
90+
SafeArg.of("execId", execId),
91+
SafeArg.of("podCreationTimeout", podCreationTimeout))
8992
Utils.tryLogNonFatalError {
9093
kubernetesClient
9194
.pods()
@@ -94,8 +97,10 @@ private[spark] class ExecutorPodsAllocator(
9497
}
9598
newlyCreatedExecutors -= execId
9699
} else {
97-
safeLogDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
98-
s" was created ${currentTime - timeCreated} milliseconds ago.")
100+
safeLogDebug("Executor with id {execId} was not found in the Kubernetes cluster since it" +
101+
" was created {timeSinceCreation} milliseconds ago.",
102+
SafeArg.of("execId", execId),
103+
SafeArg.of("timeSinceCreation", currentTime - timeCreated))
99104
}
100105
}
101106

@@ -112,15 +117,19 @@ private[spark] class ExecutorPodsAllocator(
112117
case _ => false
113118
}
114119
val currentTotalExpectedExecutors = totalExpectedExecutors.get
115-
safeLogDebug(s"Currently have $currentRunningExecutors running executors and" +
116-
s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
117-
s" have been requested but are pending appearance in the cluster.")
120+
safeLogDebug("Currently have {currentRunningExecutors} running executors and" +
121+
" {currentPendingExecutors} pending executors. {newlyCreatedExecutors} executors" +
122+
" have been requested but are pending appearance in the cluster.",
123+
SafeArg.of("currentRunningExecutors", currentRunningExecutors),
124+
SafeArg.of("currentPendingExecutors", currentPendingExecutors),
125+
SafeArg.of("newlyCreatedExecutors", newlyCreatedExecutors))
118126
if (newlyCreatedExecutors.isEmpty
119127
&& currentPendingExecutors == 0
120128
&& currentRunningExecutors < currentTotalExpectedExecutors) {
121129
val numExecutorsToAllocate = math.min(
122130
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
123-
safeLogInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
131+
safeLogInfo("Going to request {numExecutorsToAllocate} executors from Kubernetes.",
132+
SafeArg.of("numExecutorsToAllocate", numExecutorsToAllocate))
124133
for ( _ <- 0 until numExecutorsToAllocate) {
125134
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
126135
val executorConf = KubernetesConf.createExecutorConf(
@@ -136,18 +145,23 @@ private[spark] class ExecutorPodsAllocator(
136145
.build()
137146
kubernetesClient.pods().create(podWithAttachedContainer)
138147
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
139-
safeLogDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
148+
safeLogDebug("Requested executor with id {newExecutorId} from Kubernetes.",
149+
SafeArg.of("newExecutorId", newExecutorId))
140150
}
141151
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
142152
// TODO handle edge cases if we end up with more running executors than expected.
143153
safeLogDebug("Current number of running executors is equal to the number of requested" +
144154
" executors. Not scaling up further.")
145155
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
146-
safeLogDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
147-
s" executors to begin running before requesting for more executors. # of executors in" +
148-
s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
149-
s" created but we have not observed as being present in the cluster yet:" +
150-
s" ${newlyCreatedExecutors.size}.")
156+
safeLogDebug("Still waiting for {currentWaitingExecutors}" +
157+
" executors to begin running before requesting for more executors. # of executors in" +
158+
" pending status in the cluster: {currentPendingExecutors}. # of executors that we have" +
159+
" created but we have not observed as being present in the cluster yet:" +
160+
" ${newlyCreatedExecutors}.",
161+
SafeArg.of("currentWaitingExecutors",
162+
newlyCreatedExecutors.size + currentPendingExecutors),
163+
SafeArg.of("currentPendingExecutors", currentPendingExecutors),
164+
SafeArg.of("newlyCreatedExecutors", newlyCreatedExecutors.size))
151165
}
152166
}
153167
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import com.google.common.cache.Cache
20+
import com.palantir.logsafe.SafeArg
2021
import io.fabric8.kubernetes.api.model.Pod
2122
import io.fabric8.kubernetes.client.KubernetesClient
2223
import scala.collection.JavaConverters._
@@ -25,7 +26,7 @@ import scala.collection.mutable
2526
import org.apache.spark.SparkConf
2627
import org.apache.spark.deploy.k8s.Config._
2728
import org.apache.spark.deploy.k8s.KubernetesUtils._
28-
import org.apache.spark.internal.SafeLogging
29+
import org.apache.spark.logging.SafeLogging
2930
import org.apache.spark.scheduler.ExecutorExited
3031
import org.apache.spark.util.Utils
3132

@@ -57,18 +58,24 @@ private[spark] class ExecutorPodsLifecycleManager(
5758
snapshot.executorPods.foreach { case (execId, state) =>
5859
state match {
5960
case deleted@PodDeleted(_) =>
60-
safeLogDebug(s"Snapshot reported deleted executor with id $execId," +
61-
s" pod name ${state.pod.getMetadata.getName}")
61+
safeLogDebug(
62+
"Snapshot reported deleted executor with id {execId} and pod name {podName}",
63+
SafeArg.of("execId", execId),
64+
SafeArg.of("podName", state.pod.getMetadata.getName))
6265
removeExecutorFromSpark(schedulerBackend, deleted, execId)
6366
execIdsRemovedInThisRound += execId
6467
case failed@PodFailed(_) =>
65-
safeLogDebug(s"Snapshot reported failed executor with id $execId," +
66-
s" pod name ${state.pod.getMetadata.getName}")
68+
safeLogDebug(
69+
"Snapshot reported failed executor with id {execId} and pod name {podName}",
70+
SafeArg.of("execId", execId),
71+
SafeArg.of("podName", state.pod.getMetadata.getName))
6772
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
6873
case succeeded@PodSucceeded(_) =>
69-
safeLogDebug(s"Snapshot reported succeeded executor with id $execId," +
70-
s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
71-
s" unusual unless Spark specifically informed the executor to exit.")
74+
safeLogDebug(
75+
"Snapshot reported succeeded executor with id {execId} and pod name {podName}." +
76+
" Note that unusual unless Spark specifically informed the executor to exit.",
77+
SafeArg.of("execId", execId),
78+
SafeArg.of("podName", state.pod.getMetadata.getName))
7279
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
7380
case _ =>
7481
}
@@ -89,7 +96,10 @@ private[spark] class ExecutorPodsLifecycleManager(
8996
val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
9097
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
9198
s" executor may have been deleted but the driver missed the deletion event."
92-
safeLogDebug(exitReasonMessage)
99+
safeLogDebug("The executor with ID {missingExecutorId} was not found in the" +
100+
" cluster but we didn't get a reason why. Marking the executor as failed. The" +
101+
" executor may have been deleted but the driver missed the deletion event.",
102+
SafeArg.of("missingExecutorId", missingExecutorId))
93103
val exitReason = ExecutorExited(
94104
UNKNOWN_EXIT_CODE,
95105
exitCausedByApp = false,
@@ -101,8 +111,9 @@ private[spark] class ExecutorPodsLifecycleManager(
101111
}
102112

103113
if (execIdsRemovedInThisRound.nonEmpty) {
104-
safeLogDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
105-
s" from Spark that were either found to be deleted or non-existent in the cluster.")
114+
safeLogDebug("Removed executors with ids {execIdsRemovedInThisRound}" +
115+
" from Spark that were either found to be deleted or non-existent in the cluster.",
116+
SafeArg.of("execIdsRemovedInThisRound", execIdsRemovedInThisRound.mkString(",")))
106117
}
107118
}
108119

0 commit comments

Comments
 (0)