Skip to content

Commit 25b97a4

Browse files
ankuriitgMarcelo Vanzin
authored andcommitted
[SPARK-26753][CORE] Fixed custom log levels for spark-shell by using Filter instead of Threshold
This fix replaces the Threshold with a Filter for ConsoleAppender which checks to ensure that either the logLevel is greater than thresholdLevel (shell log level) or the log originated from a custom defined logger. In these cases, it lets a log event go through, otherwise it doesn't. 1. Ensured that custom log level works when set by default (via log4j.properties) 2. Ensured that logs are not printed twice when log level is changed by setLogLevel 3. Ensured that custom logs are printed when log level is changed back by setLogLevel Closes apache#23675 from ankuriitg/ankurgupta/SPARK-26753. Authored-by: ankurgupta <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent a8da410 commit 25b97a4

File tree

3 files changed

+94
-20
lines changed

3 files changed

+94
-20
lines changed

core/src/main/scala/org/apache/spark/internal/Logging.scala

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.spark.internal
1919

20-
import java.util.concurrent.ConcurrentHashMap
21-
2220
import scala.collection.JavaConverters._
2321

2422
import org.apache.log4j._
23+
import org.apache.log4j.spi.{Filter, LoggingEvent}
2524
import org.slf4j.{Logger, LoggerFactory}
2625
import org.slf4j.impl.StaticLoggerBinder
2726

@@ -154,16 +153,10 @@ trait Logging {
154153
System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
155154
"For SparkR, use setLogLevel(newLevel).")
156155
}
156+
Logging.sparkShellThresholdLevel = replLevel
157157
rootLogger.getAllAppenders().asScala.foreach {
158158
case ca: ConsoleAppender =>
159-
Option(ca.getThreshold()) match {
160-
case Some(t) =>
161-
Logging.consoleAppenderToThreshold.put(ca, t)
162-
if (!t.isGreaterOrEqual(replLevel)) {
163-
ca.setThreshold(replLevel)
164-
}
165-
case None => ca.setThreshold(replLevel)
166-
}
159+
ca.addFilter(new SparkShellLoggingFilter())
167160
case _ => // no-op
168161
}
169162
}
@@ -182,7 +175,7 @@ private[spark] object Logging {
182175
@volatile private var initialized = false
183176
@volatile private var defaultRootLevel: Level = null
184177
@volatile private var defaultSparkLog4jConfig = false
185-
private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]()
178+
@volatile private[spark] var sparkShellThresholdLevel: Level = null
186179

187180
val initLock = new Object()
188181
try {
@@ -211,11 +204,7 @@ private[spark] object Logging {
211204
} else {
212205
val rootLogger = LogManager.getRootLogger()
213206
rootLogger.setLevel(defaultRootLevel)
214-
rootLogger.getAllAppenders().asScala.foreach {
215-
case ca: ConsoleAppender =>
216-
ca.setThreshold(consoleAppenderToThreshold.get(ca))
217-
case _ => // no-op
218-
}
207+
sparkShellThresholdLevel = null
219208
}
220209
}
221210
this.initialized = false
@@ -229,3 +218,31 @@ private[spark] object Logging {
229218
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
230219
}
231220
}
221+
222+
private class SparkShellLoggingFilter extends Filter {
223+
224+
/**
225+
* If sparkShellThresholdLevel is not defined, this filter is a no-op.
226+
* If log level of event is not equal to root level, the event is allowed. Otherwise,
227+
* the decision is made based on whether the log came from root or some custom configuration
228+
* @param loggingEvent
229+
* @return decision for accept/deny log event
230+
*/
231+
def decide(loggingEvent: LoggingEvent): Int = {
232+
if (Logging.sparkShellThresholdLevel == null) {
233+
return Filter.NEUTRAL
234+
}
235+
val rootLevel = LogManager.getRootLogger().getLevel()
236+
if (!loggingEvent.getLevel().eq(rootLevel)) {
237+
return Filter.NEUTRAL
238+
}
239+
var logger = loggingEvent.getLogger()
240+
while (logger.getParent() != null) {
241+
if (logger.getLevel() != null) {
242+
return Filter.NEUTRAL
243+
}
244+
logger = logger.getParent()
245+
}
246+
return Filter.DENY
247+
}
248+
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2284,10 +2284,8 @@ private[spark] object Utils extends Logging {
22842284
def setLogLevel(l: org.apache.log4j.Level) {
22852285
val rootLogger = org.apache.log4j.Logger.getRootLogger()
22862286
rootLogger.setLevel(l)
2287-
rootLogger.getAllAppenders().asScala.foreach {
2288-
case ca: org.apache.log4j.ConsoleAppender => ca.setThreshold(l)
2289-
case _ => // no-op
2290-
}
2287+
// Setting threshold to null as rootLevel will define log level for spark-shell
2288+
Logging.sparkShellThresholdLevel = null
22912289
}
22922290

22932291
/**
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal
19+
20+
import org.apache.log4j.{Level, Logger}
21+
import org.apache.log4j.spi.{Filter, LoggingEvent}
22+
23+
import org.apache.spark.SparkFunSuite
24+
import org.apache.spark.util.Utils
25+
26+
class LoggingSuite extends SparkFunSuite {
27+
28+
test("spark-shell logging filter") {
29+
val ssf = new SparkShellLoggingFilter()
30+
val rootLogger = Logger.getRootLogger()
31+
val originalLevel = rootLogger.getLevel()
32+
rootLogger.setLevel(Level.INFO)
33+
val originalThreshold = Logging.sparkShellThresholdLevel
34+
Logging.sparkShellThresholdLevel = Level.WARN
35+
try {
36+
val logger = Logger.getLogger("a.b.c.D")
37+
val logEvent = new LoggingEvent(logger.getName(), logger, Level.INFO, "Test", null)
38+
assert(ssf.decide(logEvent) === Filter.DENY)
39+
40+
// log level is less than threshold level but different from root level
41+
val logEvent1 = new LoggingEvent(logger.getName(), logger, Level.DEBUG, "Test", null)
42+
assert(ssf.decide(logEvent1) != Filter.DENY)
43+
44+
// custom log level configured
45+
val parentLogger = Logger.getLogger("a.b.c")
46+
parentLogger.setLevel(Level.INFO)
47+
assert(ssf.decide(logEvent) != Filter.DENY)
48+
49+
// log level is greater than or equal to threshold level
50+
val logger2 = Logger.getLogger("a.b.E")
51+
val logEvent2 = new LoggingEvent(logger2.getName(), logger2, Level.INFO, "Test", null)
52+
Utils.setLogLevel(Level.INFO)
53+
assert(ssf.decide(logEvent2) != Filter.DENY)
54+
} finally {
55+
rootLogger.setLevel(originalLevel)
56+
Logging.sparkShellThresholdLevel = originalThreshold
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)