Skip to content

Commit b722b1c

Browse files
ueshindongjoon-hyun
authored andcommitted
[SPARK-54170][CORE] Use StructuredLogging message in Scala side
### What changes were proposed in this pull request? Uses StructuredLogging message in Scala side. ### Why are the changes needed? Follow-up of apache#52689. The StructuredLogging framework should be used. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52864 from ueshin/issues/SPARK-54170/mdc. Authored-by: Takuya Ueshin <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 031af16 commit b722b1c

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ public enum LogKeys implements LogKey {
614614
PYTHON_WORKER_CHANNEL_IS_BLOCKING_MODE,
615615
PYTHON_WORKER_CHANNEL_IS_CONNECTED,
616616
PYTHON_WORKER_HAS_INPUTS,
617+
PYTHON_WORKER_ID,
617618
PYTHON_WORKER_IDLE_TIMEOUT,
618619
PYTHON_WORKER_IS_ALIVE,
619620
PYTHON_WORKER_MODULE,

core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
2626
import scala.jdk.CollectionConverters._
2727

2828
import org.apache.spark.SparkEnv
29-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.{Logging, LogKeys}
3030
import org.apache.spark.storage.{PythonWorkerLogBlockIdGenerator, PythonWorkerLogLine, RollingLogWriter}
3131

3232
/**
@@ -64,7 +64,9 @@ private[python] class PythonWorkerLogCapture(
6464
writer.close()
6565
} catch {
6666
case e: Exception =>
67-
logWarning(s"Failed to close log writer for worker $workerId", e)
67+
logWarning(
68+
log"Failed to close log writer for worker ${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}",
69+
e)
6870
}
6971
}
7072
}
@@ -73,12 +75,14 @@ private[python] class PythonWorkerLogCapture(
7375
* Closes all active worker log writers.
7476
*/
7577
def closeAllWriters(): Unit = {
76-
workerLogWriters.values().asScala.foreach { case (writer, _) =>
78+
workerLogWriters.asScala.foreach { case (workerId, (writer, _)) =>
7779
try {
7880
writer.close()
7981
} catch {
8082
case e: Exception =>
81-
logWarning("Failed to close log writer", e)
83+
logWarning(
84+
log"Failed to close log writer for worker ${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}",
85+
e)
8286
}
8387
}
8488
workerLogWriters.clear()
@@ -128,7 +132,8 @@ private[python] class PythonWorkerLogCapture(
128132
}
129133
} catch {
130134
case e: Exception =>
131-
logWarning(s"Failed to write log for worker $workerId", e)
135+
logWarning(
136+
log"Failed to write log for worker ${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}", e)
132137
}
133138
}
134139
prefix

0 commit comments

Comments
 (0)