Skip to content

Commit ca861fe

Browse files
10110346cloud-fan
authored andcommitted
[SPARK-25300][CORE] Unified the configuration parameter spark.shuffle.service.enabled
## What changes were proposed in this pull request? The configuration parameter "spark.shuffle.service.enabled" has defined in `package.scala`, and it is also used in many place, so we can replace it with `SHUFFLE_SERVICE_ENABLED`. and unified this configuration parameter "spark.shuffle.service.port" together. ## How was this patch tested? N/A Closes apache#22306 from 10110346/unifiedserviceenable. Authored-by: liuxian <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 103f513 commit ca861fe

File tree

15 files changed

+36
-29
lines changed

15 files changed

+36
-29
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
2525

2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

28-
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
@@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
212212
}
213213
// Require external shuffle service for dynamic allocation
214214
// Otherwise, we may lose shuffle files when killing executors
215-
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
215+
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
216216
throw new SparkException("Dynamic allocation of executors requires the external " +
217217
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
218218
}

core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
2222
import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.{SecurityManager, SparkConf}
25-
import org.apache.spark.internal.Logging
25+
import org.apache.spark.internal.{config, Logging}
2626
import org.apache.spark.metrics.MetricsSystem
2727
import org.apache.spark.network.TransportContext
2828
import org.apache.spark.network.crypto.AuthServerBootstrap
@@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
4545
protected val masterMetricsSystem =
4646
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
4747

48-
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
49-
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
48+
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
49+
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
5050

5151
private val transportConf =
5252
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -131,7 +131,7 @@ object ExternalShuffleService extends Logging {
131131

132132
// we override this value since this service is started from the command line
133133
// and we assume the user really wants it to be running
134-
sparkConf.set("spark.shuffle.service.enabled", "true")
134+
sparkConf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
135135
server = newShuffleService(sparkConf, securityManager)
136136
server.start()
137137

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.deploy.master.Master
2424
import org.apache.spark.deploy.worker.Worker
25-
import org.apache.spark.internal.Logging
25+
import org.apache.spark.internal.{config, Logging}
2626
import org.apache.spark.rpc.RpcEnv
2727
import org.apache.spark.util.Utils
2828

@@ -52,7 +52,7 @@ class LocalSparkCluster(
5252
// Disable REST server on Master in this mode unless otherwise specified
5353
val _conf = conf.clone()
5454
.setIfMissing("spark.master.rest.enabled", "false")
55-
.set("spark.shuffle.service.enabled", "false")
55+
.set(config.SHUFFLE_SERVICE_ENABLED.key, "false")
5656

5757
/* Start the Master */
5858
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.deploy.DeployMessages._
3636
import org.apache.spark.deploy.ExternalShuffleService
3737
import org.apache.spark.deploy.master.{DriverState, Master}
3838
import org.apache.spark.deploy.worker.ui.WorkerWebUI
39-
import org.apache.spark.internal.Logging
39+
import org.apache.spark.internal.{config, Logging}
4040
import org.apache.spark.metrics.MetricsSystem
4141
import org.apache.spark.rpc._
4242
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -773,7 +773,7 @@ private[deploy] object Worker extends Logging {
773773
// bound, we may launch no more than one external shuffle service on each host.
774774
// When this happens, we should give explicit reason of failure instead of fail silently. For
775775
// more detail see SPARK-20989.
776-
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
776+
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
777777
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
778778
require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
779779
"Starting multiple workers on one host is failed because we may launch no more than one " +

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ package object config {
144144
private[spark] val SHUFFLE_SERVICE_ENABLED =
145145
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
146146

147+
private[spark] val SHUFFLE_SERVICE_PORT =
148+
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
149+
147150
private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
148151
.doc("Location of user's keytab.")
149152
.stringConf.createOptional

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private[spark] class BlockManager(
130130
extends BlockDataManager with BlockEvictionHandler with Logging {
131131

132132
private[spark] val externalShuffleServiceEnabled =
133-
conf.getBoolean("spark.shuffle.service.enabled", false)
133+
conf.get(config.SHUFFLE_SERVICE_ENABLED)
134134
private val chunkSize =
135135
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
136136
private val remoteReadNioBufferConversion =
@@ -165,12 +165,13 @@ private[spark] class BlockManager(
165165
// Port used by the external shuffle service. In Yarn mode, this may be already be
166166
// set through the Hadoop configuration as the server is launched in the Yarn NM.
167167
private val externalShuffleServicePort = {
168-
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
168+
val tmpPort = Utils.getSparkOrYarnConfig(conf, config.SHUFFLE_SERVICE_PORT.key,
169+
config.SHUFFLE_SERVICE_PORT.defaultValueString).toInt
169170
if (tmpPort == 0) {
170171
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
171172
// an open port. But we still need to tell our spark apps the right port to use. So
172173
// only if the yarn config has the port set to 0, we prefer the value in the spark config
173-
conf.get("spark.shuffle.service.port").toInt
174+
conf.get(config.SHUFFLE_SERVICE_PORT.key).toInt
174175
} else {
175176
tmpPort
176177
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.slf4j.Logger
6060

6161
import org.apache.spark._
6262
import org.apache.spark.deploy.SparkHadoopUtil
63-
import org.apache.spark.internal.Logging
63+
import org.apache.spark.internal.{config, Logging}
6464
import org.apache.spark.internal.config._
6565
import org.apache.spark.launcher.SparkLauncher
6666
import org.apache.spark.network.util.JavaUtils
@@ -822,7 +822,7 @@ private[spark] object Utils extends Logging {
822822
* logic of locating the local directories according to deployment mode.
823823
*/
824824
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
825-
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
825+
val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
826826
if (isRunningInYarnContainer(conf)) {
827827
// If we are in yarn mode, systems can have different disk layouts so we must set it
828828
// to what Yarn on this system said was available. Note this assumes that Yarn has

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.mockito.Mockito.{mock, never, verify, when}
2424
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2525

2626
import org.apache.spark.executor.TaskMetrics
27+
import org.apache.spark.internal.config
2728
import org.apache.spark.scheduler._
2829
import org.apache.spark.scheduler.ExternalClusterManager
2930
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1092,7 +1093,7 @@ class ExecutorAllocationManagerSuite
10921093
val maxExecutors = 2
10931094
val conf = new SparkConf()
10941095
.set("spark.dynamicAllocation.enabled", "true")
1095-
.set("spark.shuffle.service.enabled", "true")
1096+
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
10961097
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
10971098
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
10981099
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import org.scalatest.BeforeAndAfterAll
2121

22+
import org.apache.spark.internal.config
2223
import org.apache.spark.network.TransportContext
2324
import org.apache.spark.network.netty.SparkTransportConf
2425
import org.apache.spark.network.server.TransportServer
@@ -42,8 +43,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
4243
server = transportContext.createServer()
4344

4445
conf.set("spark.shuffle.manager", "sort")
45-
conf.set("spark.shuffle.service.enabled", "true")
46-
conf.set("spark.shuffle.service.port", server.getPort.toString)
46+
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
47+
conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString)
4748
}
4849

4950
override def afterAll() {

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ class StandaloneDynamicAllocationSuite
458458
val initialExecutorLimit = 1
459459
val myConf = appConf
460460
.set("spark.dynamicAllocation.enabled", "true")
461-
.set("spark.shuffle.service.enabled", "true")
461+
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
462462
.set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
463463
sc = new SparkContext(myConf)
464464
val appId = sc.applicationId

0 commit comments

Comments
 (0)