Skip to content

Commit c13b60e

Browse files
Marcelo Vanzincloud-fan
authored andcommitted
[SPARK-22533][CORE] Handle deprecated names in ConfigEntry.
This change hooks up the config reader to `SparkConf.getDeprecatedConfig`, so that config constants with deprecated names generate the proper warnings. It also changes two deprecated configs from the new "alternatives" system to the old deprecation system, since they're not yet hooked up to each other. Added a few unit tests to verify the desired behavior. Author: Marcelo Vanzin <[email protected]> Closes #19760 from vanzin/SPARK-22533.
1 parent 3c3eebc commit c13b60e

File tree

4 files changed

+20
-9
lines changed

4 files changed

+20
-9
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.{Map => JMap}
2021
import java.util.concurrent.ConcurrentHashMap
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable.LinkedHashSet
2425

2526
import org.apache.avro.{Schema, SchemaNormalization}
2627

28+
import org.apache.spark.deploy.history.config._
2729
import org.apache.spark.internal.Logging
2830
import org.apache.spark.internal.config._
2931
import org.apache.spark.serializer.KryoSerializer
@@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
370372

371373
/** Get a parameter as an Option */
372374
def getOption(key: String): Option[String] = {
373-
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
375+
Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
374376
}
375377

376378
/** Get an optional value, applying variable substitution. */
@@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging {
622624
AlternateConfig("spark.history.updateInterval", "1.3")),
623625
"spark.history.fs.cleaner.interval" -> Seq(
624626
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
625-
"spark.history.fs.cleaner.maxAge" -> Seq(
627+
MAX_LOG_AGE_S.key -> Seq(
626628
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
627629
"spark.yarn.am.waitTime" -> Seq(
628630
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
@@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging {
663665
AlternateConfig("spark.yarn.jar", "2.0")),
664666
"spark.yarn.access.hadoopFileSystems" -> Seq(
665667
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
666-
"spark.maxRemoteBlockSizeFetchToMem" -> Seq(
667-
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"))
668+
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
669+
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
670+
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
671+
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
668672
)
669673

670674
/**
@@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging {
704708
* Looks for available deprecated keys for the given config option, and return the first
705709
* value available.
706710
*/
707-
def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
711+
def getDeprecatedConfig(key: String, conf: JMap[String, String]): Option[String] = {
708712
configsWithAlternatives.get(key).flatMap { alts =>
709-
alts.collectFirst { case alt if conf.contains(alt.key) =>
713+
alts.collectFirst { case alt if conf.containsKey(alt.key) =>
710714
val value = conf.get(alt.key)
711715
if (alt.translation != null) alt.translation(value) else value
712716
}

core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.internal.config
1919

2020
import java.util.{Map => JMap}
2121

22+
import org.apache.spark.SparkConf
23+
2224
/**
2325
* A source of configuration values.
2426
*/
@@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
5355

5456
override def get(key: String): Option[String] = {
5557
if (key.startsWith("spark.")) {
56-
Option(conf.get(key))
58+
Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf))
5759
} else {
5860
None
5961
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ package object config {
209209

210210
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
211211
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
212-
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
213212
.intConf
214213
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
215214
.createWithDefault(10000)
@@ -404,7 +403,6 @@ package object config {
404403
"affect both shuffle fetch and block manager remote block fetch. For users who " +
405404
"enabled external shuffle service, this feature can only be worked when external shuffle" +
406405
" service is newer than Spark 2.2.")
407-
.withAlternative("spark.reducer.maxReqSizeShuffleToMem")
408406
.bytesConf(ByteUnit.BYTE)
409407
.createWithDefault(Long.MaxValue)
410408

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.util.{Random, Try}
2626

2727
import com.esotericsoftware.kryo.Kryo
2828

29+
import org.apache.spark.deploy.history.config._
2930
import org.apache.spark.internal.config._
3031
import org.apache.spark.network.util.ByteUnit
3132
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
@@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
248249

249250
conf.set("spark.kryoserializer.buffer.mb", "1.1")
250251
assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
252+
253+
conf.set("spark.history.fs.cleaner.maxAge.seconds", "42")
254+
assert(conf.get(MAX_LOG_AGE_S) === 42L)
255+
256+
conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
257+
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
251258
}
252259

253260
test("akka deprecated configs") {

0 commit comments

Comments
 (0)