Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit d7dd259

Browse files
hex108liyinan926
authored andcommitted
Refactor for Hadoop conf related code (#596)
1 parent 6d724a9 commit d7dd259

File tree

6 files changed

+45
-44
lines changed

6 files changed

+45
-44
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ private[spark] trait HadoopConfBootstrap {
4040

4141
private[spark] class HadoopConfBootstrapImpl(
4242
hadoopConfConfigMapName: String,
43-
hadoopConfigFiles: Seq[File],
44-
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging {
43+
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging {
4544

4645
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
4746
: PodWithMainContainer = {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.File
20+
21+
private[spark] object HadoopConfUtils {
22+
23+
def getHadoopConfFiles(path: String) : Seq[File] = {
24+
val dir = new File(path)
25+
if (dir.isDirectory) {
26+
dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }.toSeq
27+
} else {
28+
Seq.empty[File]
29+
}
30+
}
31+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
1919
import java.io.File
2020

2121
import org.apache.spark.SparkConf
22-
import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, OptionRequirements}
22+
import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, OptionRequirements}
2323
import org.apache.spark.deploy.k8s.HadoopConfSparkUserBootstrapImpl
2424
import org.apache.spark.deploy.k8s.config._
2525
import org.apache.spark.internal.Logging
@@ -43,7 +43,7 @@ private[spark] class HadoopStepsOrchestrator(
4343
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
4444
private val maybeRenewerPrincipal =
4545
submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL)
46-
private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
46+
private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir)
4747
private val hadoopUGI = new HadoopUGIUtilImpl
4848
logInfo(s"Hadoop Conf directory: $hadoopConfDir")
4949

@@ -70,8 +70,7 @@ private[spark] class HadoopStepsOrchestrator(
7070
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
7171
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
7272
hadoopConfigMapName,
73-
hadoopConfigurationFiles,
74-
hadoopUGI)
73+
hadoopConfigurationFiles)
7574
val hadoopConfMounterStep = new HadoopConfMounterStep(
7675
hadoopConfigMapName,
7776
hadoopConfigurationFiles,
@@ -95,13 +94,4 @@ private[spark] class HadoopStepsOrchestrator(
9594
}
9695
Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq
9796
}
98-
99-
private def getHadoopConfFiles(path: String) : Seq[File] = {
100-
val dir = new File(path)
101-
if (dir.isDirectory) {
102-
dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }.toSeq
103-
} else {
104-
Seq.empty[File]
105-
}
106-
}
10797
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import io.fabric8.kubernetes.client.Config
2222

2323
import org.apache.spark.SparkContext
24-
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
24+
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
2727
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
@@ -87,14 +87,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
8787
sparkConf)
8888
}
8989

90-
val hadoopUtil = new HadoopUGIUtilImpl
9190
val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap =>
9291
val hadoopConfigurations = maybeHadoopConfDir.map(
93-
conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File])
92+
conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File])
9493
new HadoopConfBootstrapImpl(
9594
hadoopConfigMap,
96-
hadoopConfigurations,
97-
hadoopUtil)
95+
hadoopConfigurations)
9896
}
9997

10098
val kerberosBootstrap =
@@ -109,6 +107,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
109107
Utils.getCurrentUserName() ) }
110108
}
111109

110+
val hadoopUtil = new HadoopUGIUtilImpl
112111
val hadoopUserBootstrap =
113112
if (hadoopBootStrap.isDefined && kerberosBootstrap.isEmpty) {
114113
Some(new HadoopConfSparkUserBootstrapImpl(hadoopUtil))
@@ -202,13 +201,4 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
202201
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
203202
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
204203
}
205-
206-
private def getHadoopConfFiles(path: String) : Array[File] = {
207-
val dir = new File(path)
208-
if (dir.isDirectory) {
209-
dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }
210-
} else {
211-
Array.empty[File]
212-
}
213-
}
214204
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ import scala.collection.JavaConverters._
2323

2424
import com.google.common.io.Files
2525
import io.fabric8.kubernetes.api.model._
26-
import org.mockito.{Mock, MockitoAnnotations}
27-
import org.mockito.Mockito.when
26+
import org.mockito.MockitoAnnotations
2827
import org.scalatest.BeforeAndAfter
2928

3029
import org.apache.spark.SparkFunSuite
31-
import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, PodWithMainContainer}
30+
import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, PodWithMainContainer}
3231
import org.apache.spark.deploy.k8s.constants._
3332
import org.apache.spark.util.Utils
3433

@@ -38,19 +37,14 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeA
3837
private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
3938
private val SPARK_USER_VALUE = "sparkUser"
4039

41-
@Mock
42-
private var hadoopUtil: HadoopUGIUtilImpl = _
43-
4440
before {
4541
MockitoAnnotations.initMocks(this)
46-
when(hadoopUtil.getShortUserName).thenReturn(SPARK_USER_VALUE)
4742
}
4843

4944
test("Test of bootstrapping hadoop_conf_dir files") {
5045
val hadoopConfStep = new HadoopConfBootstrapImpl(
5146
CONFIG_MAP_NAME,
52-
HADOOP_FILES,
53-
hadoopUtil)
47+
HADOOP_FILES)
5448
val expectedKeyPaths = Seq(
5549
new KeyToPathBuilder()
5650
.withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
351351
val hadoopFiles = Seq(hadoopFile)
352352
val hadoopBootsrap = new HadoopConfBootstrapImpl(
353353
hadoopConfConfigMapName = configName,
354-
hadoopConfigFiles = hadoopFiles,
355-
hadoopUGI = hadoopUGI)
354+
hadoopConfigFiles = hadoopFiles)
356355

357356
val factory = new ExecutorPodFactoryImpl(
358357
conf,
@@ -388,8 +387,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
388387
val hadoopFiles = Seq(hadoopFile)
389388
val hadoopBootstrap = new HadoopConfBootstrapImpl(
390389
hadoopConfConfigMapName = configName,
391-
hadoopConfigFiles = hadoopFiles,
392-
hadoopUGI = hadoopUGI)
390+
hadoopConfigFiles = hadoopFiles)
393391
val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI)
394392

395393
val factory = new ExecutorPodFactoryImpl(
@@ -427,8 +425,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
427425
val hadoopFiles = Seq(hadoopFile)
428426
val hadoopBootstrap = new HadoopConfBootstrapImpl(
429427
hadoopConfConfigMapName = configName,
430-
hadoopConfigFiles = hadoopFiles,
431-
hadoopUGI = hadoopUGI)
428+
hadoopConfigFiles = hadoopFiles)
432429
val secretName = "secret-test"
433430
val secretItemKey = "item-test"
434431
val userName = "sparkUser"

0 commit comments

Comments
 (0)