Skip to content

Commit 096a4ef

Browse files
dvogelbacherRobert Kruszewski
authored andcommitted
Add SafeLogging to more classes (apache-spark-on-k8s#466)
Add SafeLogging (similar to palantir#425) to more classes that can be useful: - k8s classes - CoarseGrainedSchedulerBackend - SparkContext - MemoryStore - Executor - CoarseGrainedExecutorBackend - TorrentBroadCast and Broadcast
1 parent 1248e31 commit 096a4ef

File tree

16 files changed

+411
-209
lines changed

16 files changed

+411
-209
lines changed

FORK.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@
1010
* [SPARK-18079](https://issues.apache.org/jira/browse/SPARK-18079) - CollectLimitExec.executeToIterator should perform per-partition limits
1111

1212
* [SPARK-15777](https://issues.apache.org/jira/browse/SPARK-15777) (Partial fix) - Catalog federation
13-
* make ExternalCatalog configurable beyond in memory and hive
14-
* FileIndex for catalog tables is provided by external catalog instead of using default impl
13+
* make ExternalCatalog configurable beyond in memory and hive
14+
* FileIndex for catalog tables is provided by external catalog instead of using default impl
1515

1616
* Better pushdown for IN expressions in parquet via UserDefinedPredicate ([SPARK-17091](https://issues.apache.org/jira/browse/SPARK-17091) for original issue)
17-
17+
* SafeLogging implemented for the following files:
18+
* core: Broadcast, CoarseGrainedExecutorBackend, CoarseGrainedSchedulerBackend, Executor, MemoryStore, SparkContext, TorrentBroadcast
19+
* kubernetes: ExecutorPodsAllocator, ExecutorPodsLifecycleManager, ExecutorPodsPollingSnapshotSource, ExecutorPodsSnapshot, ExecutorPodsWatchSnapshotSource, KubernetesClusterSchedulerBackend
20+
* yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend
1821
# Added
1922

2023
* Gradle plugin to easily create custom docker images for use with k8s
21-
* Filter rLibDir by exists so that daemon.R references the correct file [460](https://github.com/palantir/spark/pull/460)
24+
* Filter rLibDir by exists so that daemon.R references the correct file [460](https://github.com/palantir/spark/pull/460)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 64 additions & 47 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import java.io.Serializable
2121

2222
import scala.reflect.ClassTag
2323

24+
import com.palantir.logsafe.{SafeArg, UnsafeArg}
25+
2426
import org.apache.spark.SparkException
25-
import org.apache.spark.internal.Logging
27+
import org.apache.spark.internal.SafeLogging
2628
import org.apache.spark.util.Utils
2729

2830
/**
@@ -54,7 +56,7 @@ import org.apache.spark.util.Utils
5456
* @param id A unique identifier for the broadcast variable.
5557
* @tparam T Type of the data contained in the broadcast variable.
5658
*/
57-
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
59+
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with SafeLogging {
5860

5961
/**
6062
* Flag signifying whether the broadcast variable is valid
@@ -107,7 +109,10 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo
107109
assertValid()
108110
_isValid = false
109111
_destroySite = Utils.getCallSite().shortForm
110-
logInfo("Destroying %s (from %s)".format(toString, _destroySite))
112+
safeLogInfo("Destroying",
113+
SafeArg.of("id", id),
114+
UnsafeArg.of("stringRepr", toString),
115+
UnsafeArg.of("destroySite", _destroySite))
111116
doDestroy(blocking)
112117
}
113118

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import scala.collection.JavaConverters._
2525
import scala.reflect.ClassTag
2626
import scala.util.Random
2727

28+
import com.palantir.logsafe.SafeArg
29+
2830
import org.apache.spark._
29-
import org.apache.spark.internal.Logging
31+
import org.apache.spark.internal.SafeLogging
3032
import org.apache.spark.io.CompressionCodec
3133
import org.apache.spark.serializer.Serializer
3234
import org.apache.spark.storage._
@@ -55,7 +57,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea
5557
* @param id A unique identifier for the broadcast variable.
5658
*/
5759
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
58-
extends Broadcast[T](id) with Logging with Serializable {
60+
extends Broadcast[T](id) with SafeLogging with Serializable {
5961

6062
/**
6163
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
@@ -150,7 +152,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
150152

151153
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
152154
val pieceId = BroadcastBlockId(id, "piece" + pid)
153-
logDebug(s"Reading piece $pieceId of $broadcastId")
155+
safeLogDebug("Reading piece",
156+
SafeArg.of("pieceId", pieceId),
157+
SafeArg.of("broadcastId", broadcastId))
154158
// First try getLocalBytes because there is a chance that previous attempts to fetch the
155159
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
156160
// would be available locally (on this executor).
@@ -226,10 +230,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
226230
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
227231
}
228232
case None =>
229-
logInfo("Started reading broadcast variable " + id)
233+
safeLogInfo("Started reading broadcast variable",
234+
SafeArg.of("id", id))
230235
val startTimeMs = System.currentTimeMillis()
231236
val blocks = readBlocks()
232-
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
237+
safeLogInfo("Reading broadcast variable finished",
238+
SafeArg.of("id", id),
239+
SafeArg.of("timeUsed", Utils.getUsedTimeMs(startTimeMs)))
233240

234241
try {
235242
val obj = TorrentBroadcast.unBlockifyObject[T](
@@ -276,7 +283,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
276283
}
277284

278285

279-
private object TorrentBroadcast extends Logging {
286+
private object TorrentBroadcast extends SafeLogging {
280287

281288
def blockifyObject[T: ClassTag](
282289
obj: T,
@@ -317,7 +324,7 @@ private object TorrentBroadcast extends Logging {
317324
* If removeFromDriver is true, also remove these persisted blocks on the driver.
318325
*/
319326
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
320-
logDebug(s"Unpersisting TorrentBroadcast $id")
327+
safeLogDebug("Unpersisting TorrentBroadcast", SafeArg.of("id", id))
321328
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
322329
}
323330
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import scala.collection.mutable
2626
import scala.util.{Failure, Success}
2727
import scala.util.control.NonFatal
2828

29+
import com.palantir.logsafe.{SafeArg, UnsafeArg}
30+
2931
import org.apache.spark._
3032
import org.apache.spark.TaskState.TaskState
3133
import org.apache.spark.deploy.SparkHadoopUtil
3234
import org.apache.spark.deploy.worker.WorkerWatcher
33-
import org.apache.spark.internal.Logging
35+
import org.apache.spark.internal.{Logging, SafeLogging}
3436
import org.apache.spark.rpc._
3537
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
3638
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -45,7 +47,7 @@ private[spark] class CoarseGrainedExecutorBackend(
4547
cores: Int,
4648
userClassPath: Seq[URL],
4749
env: SparkEnv)
48-
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
50+
extends ThreadSafeRpcEndpoint with ExecutorBackend with SafeLogging {
4951

5052
private[this] val stopping = new AtomicBoolean(false)
5153
var executor: Executor = null
@@ -56,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5658
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
5759

5860
override def onStart() {
59-
logInfo("Connecting to driver: " + driverUrl)
61+
safeLogInfo("Connecting to driver", UnsafeArg.of("driverUrl", driverUrl))
6062
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
6163
// This is a very fast action so we can use "ThreadUtils.sameThread"
6264
driver = Some(ref)
@@ -78,7 +80,7 @@ private[spark] class CoarseGrainedExecutorBackend(
7880

7981
override def receive: PartialFunction[Any, Unit] = {
8082
case RegisteredExecutor =>
81-
logInfo("Successfully registered with driver")
83+
safeLogInfo("Successfully registered with driver")
8284
try {
8385
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
8486
} catch {
@@ -94,7 +96,7 @@ private[spark] class CoarseGrainedExecutorBackend(
9496
exitExecutor(1, "Received LaunchTask command but executor was null")
9597
} else {
9698
val taskDesc = TaskDescription.decode(data.value)
97-
logInfo("Got assigned task " + taskDesc.taskId)
99+
safeLogInfo("Got assigned task", SafeArg.of("taskId", taskDesc.taskId))
98100
executor.launchTask(this, taskDesc)
99101
}
100102

@@ -107,7 +109,7 @@ private[spark] class CoarseGrainedExecutorBackend(
107109

108110
case StopExecutor =>
109111
stopping.set(true)
110-
logInfo("Driver commanded a shutdown")
112+
safeLogInfo("Driver commanded a shutdown")
111113
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
112114
// a message to self to actually do the shutdown.
113115
self.send(Shutdown)
@@ -125,26 +127,29 @@ private[spark] class CoarseGrainedExecutorBackend(
125127
}.start()
126128

127129
case UpdateDelegationTokens(tokenBytes) =>
128-
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
130+
safeLogInfo("Received tokens", UnsafeArg.of("tokenBytesLength", tokenBytes.length))
129131
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
130132
}
131133

132134
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
133135
if (stopping.get()) {
134-
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
136+
safeLogInfo("Driver disconnected during shutdown",
137+
UnsafeArg.of("remoteAddress", remoteAddress))
135138
} else if (driver.exists(_.address == remoteAddress)) {
136139
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
137140
notifyDriver = false)
138141
} else {
139-
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
142+
safeLogWarning("An unknown driver disconnected.",
143+
UnsafeArg.of("remoteAddress", remoteAddress))
140144
}
141145
}
142146

143147
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
144148
val msg = StatusUpdate(executorId, taskId, state, data)
145149
driver match {
146150
case Some(driverRef) => driverRef.send(msg)
147-
case None => logWarning(s"Drop $msg because has not yet connected to driver")
151+
case None => safeLogWarning("Drop message because has not yet connected to driver",
152+
UnsafeArg.of("msg", msg))
148153
}
149154
}
150155

@@ -157,11 +162,11 @@ private[spark] class CoarseGrainedExecutorBackend(
157162
reason: String,
158163
throwable: Throwable = null,
159164
notifyDriver: Boolean = true) = {
160-
val message = "Executor self-exiting due to : " + reason
165+
val message = "Executor self-exiting"
161166
if (throwable != null) {
162-
logError(message, throwable)
167+
safeLogError(message, throwable, UnsafeArg.of("reason", reason))
163168
} else {
164-
logError(message)
169+
safeLogError(message, UnsafeArg.of("reason", reason))
165170
}
166171

167172
if (notifyDriver && driver.nonEmpty) {

0 commit comments

Comments
 (0)