Skip to content

Commit 833a9f1

Browse files
Nishchal Venkataramanadbtsai
authored andcommitted
[SPARK-24203][CORE] Make executor's bindAddress configurable
### What changes were proposed in this pull request? With this change, executor's bindAddress is passed as an input parameter for RPCEnv.create. A previous PR apache#21261 which addressed the same, was using a Spark Conf property to get the bindAddress which wouldn't have worked for multiple executors. This PR is to enable anyone overriding CoarseGrainedExecutorBackend with their custom one to be able to invoke CoarseGrainedExecutorBackend.main() along with the option to configure bindAddress. ### Why are the changes needed? This is required when Kernel-based Virtual Machine (KVM)'s are used inside Linux container where the hostname is not the same as container hostname. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested by running jobs with executors on KVMs inside a linux container. Closes apache#26331 from nishchalv/SPARK-29670. Lead-authored-by: Nishchal Venkataramana <[email protected]> Co-authored-by: nishchal <[email protected]> Signed-off-by: DB Tsai <[email protected]>
1 parent 363af16 commit 833a9f1

File tree

4 files changed

+66
-14
lines changed

4 files changed

+66
-14
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,15 @@ object SparkEnv extends Logging {
196196
private[spark] def createExecutorEnv(
197197
conf: SparkConf,
198198
executorId: String,
199+
bindAddress: String,
199200
hostname: String,
200201
numCores: Int,
201202
ioEncryptionKey: Option[Array[Byte]],
202203
isLocal: Boolean): SparkEnv = {
203204
val env = create(
204205
conf,
205206
executorId,
206-
hostname,
207+
bindAddress,
207208
hostname,
208209
None,
209210
isLocal,
@@ -214,6 +215,17 @@ object SparkEnv extends Logging {
214215
env
215216
}
216217

218+
private[spark] def createExecutorEnv(
219+
conf: SparkConf,
220+
executorId: String,
221+
hostname: String,
222+
numCores: Int,
223+
ioEncryptionKey: Option[Array[Byte]],
224+
isLocal: Boolean): SparkEnv = {
225+
createExecutorEnv(conf, executorId, hostname,
226+
hostname, numCores, ioEncryptionKey, isLocal)
227+
}
228+
217229
/**
218230
* Helper method to create a SparkEnv for a driver or an executor.
219231
*/

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
4646
override val rpcEnv: RpcEnv,
4747
driverUrl: String,
4848
executorId: String,
49+
bindAddress: String,
4950
hostname: String,
5051
cores: Int,
5152
userClassPath: Seq[URL],
@@ -227,6 +228,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
227228
case class Arguments(
228229
driverUrl: String,
229230
executorId: String,
231+
bindAddress: String,
230232
hostname: String,
231233
cores: Int,
232234
appId: String,
@@ -238,7 +240,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
238240
val createFn: (RpcEnv, Arguments, SparkEnv) =>
239241
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
240242
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
241-
arguments.hostname, arguments.cores, arguments.userClassPath, env,
243+
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
242244
arguments.resourcesFileOpt)
243245
}
244246
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
@@ -259,10 +261,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
259261
val executorConf = new SparkConf
260262
val fetcher = RpcEnv.create(
261263
"driverPropsFetcher",
264+
arguments.bindAddress,
262265
arguments.hostname,
263266
-1,
264267
executorConf,
265268
new SecurityManager(executorConf),
269+
numUsableCores = 0,
266270
clientMode = true)
267271

268272
var driver: RpcEndpointRef = null
@@ -297,8 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
297301
}
298302

299303
driverConf.set(EXECUTOR_ID, arguments.executorId)
300-
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
301-
arguments.cores, cfg.ioEncryptionKey, isLocal = false)
304+
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
305+
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
302306

303307
env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
304308
arguments.workerUrl.foreach { url =>
@@ -311,6 +315,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
311315
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
312316
var driverUrl: String = null
313317
var executorId: String = null
318+
var bindAddress: String = null
314319
var hostname: String = null
315320
var cores: Int = 0
316321
var resourcesFileOpt: Option[String] = None
@@ -327,6 +332,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
327332
case ("--executor-id") :: value :: tail =>
328333
executorId = value
329334
argv = tail
335+
case ("--bind-address") :: value :: tail =>
336+
bindAddress = value
337+
argv = tail
330338
case ("--hostname") :: value :: tail =>
331339
hostname = value
332340
argv = tail
@@ -364,7 +372,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
364372
printUsageAndExit(classNameForEntry)
365373
}
366374

367-
Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
375+
if (bindAddress == null) {
376+
bindAddress = hostname
377+
}
378+
379+
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
368380
userClassPath, resourcesFileOpt)
369381
}
370382

@@ -377,6 +389,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
377389
| Options are:
378390
| --driver-url <driverUrl>
379391
| --executor-id <executorId>
392+
| --bind-address <bindAddress>
380393
| --hostname <hostname>
381394
| --cores <cores>
382395
| --resourcesFile <fileWithJSONResourceInformation>

core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
5454
val env = createMockEnv(conf, serializer)
5555

5656
// we don't really use this, just need it to get at the parser function
57-
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
57+
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
5858
4, Seq.empty[URL], env, None)
5959
withTempDir { tmpDir =>
6060
val testResourceArgs: JObject = ("" -> "")
@@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
7676
val serializer = new JavaSerializer(conf)
7777
val env = createMockEnv(conf, serializer)
7878
// we don't really use this, just need it to get at the parser function
79-
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
79+
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
8080
4, Seq.empty[URL], env, None)
8181
withTempDir { tmpDir =>
8282
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -101,7 +101,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
101101
val serializer = new JavaSerializer(conf)
102102
val env = createMockEnv(conf, serializer)
103103
// we don't really use this, just need it to get at the parser function
104-
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
104+
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
105105
4, Seq.empty[URL], env, None)
106106

107107
withTempDir { tmpDir =>
@@ -129,7 +129,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
129129
val serializer = new JavaSerializer(conf)
130130
val env = createMockEnv(conf, serializer)
131131
// we don't really use this, just need it to get at the parser function
132-
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
132+
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
133133
4, Seq.empty[URL], env, None)
134134

135135
// not enough gpu's on the executor
@@ -168,7 +168,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
168168
val serializer = new JavaSerializer(conf)
169169
val env = createMockEnv(conf, serializer)
170170
// we don't really use this, just need it to get at the parser function
171-
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
171+
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
172172
4, Seq.empty[URL], env, None)
173173

174174
// executor resources < required
@@ -200,7 +200,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
200200
val env = createMockEnv(conf, serializer)
201201

202202
// we don't really use this, just need it to get at the parser function
203-
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
203+
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
204204
4, Seq.empty[URL], env, None)
205205

206206
val parsedResources = backend.parseOrFindResources(None)
@@ -226,7 +226,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
226226
val env = createMockEnv(conf, serializer)
227227

228228
// we don't really use this, just need it to get at the parser function
229-
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
229+
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
230230
4, Seq.empty[URL], env, None)
231231
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
232232
val ja = Extraction.decompose(Seq(gpuArgs))
@@ -254,7 +254,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
254254
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
255255
val env = createMockEnv(conf, serializer, Some(rpcEnv))
256256
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
257-
"host1", 4, Seq.empty[URL], env, None)
257+
"host1", "host1", 4, Seq.empty[URL], env, None)
258258
assert(backend.taskResources.isEmpty)
259259

260260
val taskId = 1000000
@@ -289,6 +289,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
289289
}
290290
}
291291

292+
test("SPARK-24203 when bindAddress is not set, it defaults to hostname") {
293+
val args1 = Array(
294+
"--driver-url", "driverurl",
295+
"--executor-id", "1",
296+
"--hostname", "host1",
297+
"--cores", "1",
298+
"--app-id", "app1")
299+
300+
val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
301+
assert(arg.bindAddress == "host1")
302+
}
303+
304+
test("SPARK-24203 when bindAddress is different, it does not default to hostname") {
305+
val args1 = Array(
306+
"--driver-url", "driverurl",
307+
"--executor-id", "1",
308+
"--hostname", "host1",
309+
"--bind-address", "bindaddress1",
310+
"--cores", "1",
311+
"--app-id", "app1")
312+
313+
val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
314+
assert(arg.bindAddress == "bindaddress1")
315+
}
316+
292317
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
293318
rpcEnv: Option[RpcEnv] = None): SparkEnv = {
294319
val mockEnv = mock[SparkEnv]

resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
3434
rpcEnv: RpcEnv,
3535
driverUrl: String,
3636
executorId: String,
37+
bindAddress: String,
3738
hostname: String,
3839
cores: Int,
3940
userClassPath: Seq[URL],
@@ -43,6 +44,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
4344
rpcEnv,
4445
driverUrl,
4546
executorId,
47+
bindAddress,
4648
hostname,
4749
cores,
4850
userClassPath,
@@ -68,7 +70,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
6870
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
6971
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
7072
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
71-
arguments.hostname, arguments.cores, arguments.userClassPath, env,
73+
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
7274
arguments.resourcesFileOpt)
7375
}
7476
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,

0 commit comments

Comments
 (0)