Skip to content

Commit 6052a13

Browse files
committed
mocking hadoopUGI
1 parent 6efa379 commit 6052a13

File tree

8 files changed

+129
-30
lines changed

8 files changed

+129
-30
lines changed

docs/running-on-kubernetes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ from the other deployment modes. See the [configuration page](configuration.html
816816
<td>
817817
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
818818
the name of the secret where your existing delegation token data is stored. You must also specify the
819-
label <code>spark.kubernetes.kerberos.tokensecret.name</code> where your data is stored on the secret.
819+
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
820820
</td>
821821
</tr>
822822
<tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.File
2121
import scala.collection.JavaConverters._
2222

2323
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
24-
import org.apache.hadoop.security.UserGroupInformation
2524

2625
import org.apache.spark.deploy.kubernetes.constants._
2726
import org.apache.spark.internal.Logging
@@ -42,7 +41,8 @@ private[spark] trait HadoopConfBootstrap {
4241

4342
private[spark] class HadoopConfBootstrapImpl(
4443
hadoopConfConfigMapName: String,
45-
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging{
44+
hadoopConfigFiles: Seq[File],
45+
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{
4646

4747
override def bootstrapMainContainerAndVolumes(
4848
originalPodWithMainContainer: PodWithMainContainer)
@@ -76,7 +76,7 @@ private[spark] class HadoopConfBootstrapImpl(
7676
.endEnv()
7777
.addNewEnv()
7878
.withName(ENV_SPARK_USER)
79-
.withValue(UserGroupInformation.getCurrentUser.getShortUserName)
79+
.withValue(hadoopUGI.getShortName)
8080
.endEnv()
8181
.build()
8282
originalPodWithMainContainer.copy(
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import org.apache.hadoop.security.UserGroupInformation
20+
21+
private[spark] class HadoopUGIUtil{
22+
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
23+
def getShortName: String = getCurrentUser.getShortUserName
24+
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
25+
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
26+
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
27+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import io.fabric8.kubernetes.api.model.SecretBuilder
2626
import org.apache.commons.codec.binary.Base64
2727
import org.apache.hadoop.conf.Configuration
2828
import org.apache.hadoop.fs.FileSystem
29-
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
29+
import org.apache.hadoop.security.Credentials
3030
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
3131
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
3232

3333
import org.apache.spark.SparkConf
3434
import org.apache.spark.deploy.SparkHadoopUtil
35-
import org.apache.spark.deploy.kubernetes.{KerberosTokenConfBootstrapImpl, PodWithMainContainer}
35+
import org.apache.spark.deploy.kubernetes.{HadoopUGIUtil, KerberosTokenConfBootstrapImpl, PodWithMainContainer}
3636
import org.apache.spark.deploy.kubernetes.constants._
3737
import org.apache.spark.internal.Logging
3838

@@ -51,16 +51,18 @@ import org.apache.spark.internal.Logging
5151
private[spark] class HadoopKerberosKeytabResolverStep(
5252
submissionSparkConf: SparkConf,
5353
maybePrincipal: Option[String],
54-
maybeKeytab: Option[File]) extends HadoopConfigurationStep with Logging{
55-
private var originalCredentials: Credentials = _
56-
private var dfs : FileSystem = _
57-
private var renewer: String = _
58-
private var credentials: Credentials = _
59-
private var tokens: Iterable[Token[_ <: TokenIdentifier]] = _
60-
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
54+
maybeKeytab: Option[File],
55+
hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging{
56+
private var originalCredentials: Credentials = _
57+
private var dfs : FileSystem = _
58+
private var renewer: String = _
59+
private var credentials: Credentials = _
60+
private var tokens: Iterable[Token[_ <: TokenIdentifier]] = _
61+
62+
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
6163
val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
6264
logDebug(s"Hadoop Configuration: ${hadoopConf.toString}")
63-
if (!UserGroupInformation.isSecurityEnabled) logError("Hadoop not configuration with Kerberos")
65+
if (hadoopUGI.isSecurityEnabled) logError("Hadoop not configuration with Kerberos")
6466
val maybeJobUserUGI =
6567
for {
6668
principal <- maybePrincipal
@@ -71,12 +73,12 @@ private[spark] class HadoopKerberosKeytabResolverStep(
7173
submissionSparkConf.set("spark.yarn.principal", principal)
7274
submissionSparkConf.set("spark.yarn.keytab", keytab.toURI.toString)
7375
logDebug("Logged into KDC with keytab using Job User UGI")
74-
UserGroupInformation.loginUserFromKeytabAndReturnUGI(
76+
hadoopUGI.loginUserFromKeytabAndReturnUGI(
7577
principal,
7678
keytab.toURI.toString)
7779
}
7880
// In the case that keytab is not specified we will read from Local Ticket Cache
79-
val jobUserUGI = maybeJobUserUGI.getOrElse(UserGroupInformation.getCurrentUser)
81+
val jobUserUGI = maybeJobUserUGI.getOrElse(hadoopUGI.getCurrentUser)
8082
// It is necessary to run as jobUserUGI because logged in user != Current User
8183
jobUserUGI.doAs(new PrivilegedExceptionAction[Void] {
8284
override def run(): Void = {
@@ -92,12 +94,15 @@ private[spark] class HadoopKerberosKeytabResolverStep(
9294
logDebug(s"Renewer is: $renewer")
9395
credentials = new Credentials(originalCredentials)
9496
dfs.addDelegationTokens(renewer, credentials)
97+
// This is difficult to Mock and will require refactoring
9598
tokens = credentials.getAllTokens.asScala
9699
logDebug(s"Tokens: ${credentials.toString}")
97100
logDebug(s"All tokens: ${tokens.mkString(",")}")
98101
logDebug(s"All secret keys: ${credentials.getAllSecretKeys}")
99102
null
100103
}})
104+
credentials.getAllTokens.asScala.isEmpty
105+
tokens.isEmpty
101106
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens")
102107
val data = serialize(credentials)
103108
val renewalTime = getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
1919
import java.io.File
2020

2121
import org.apache.spark.SparkConf
22-
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, OptionRequirements}
22+
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, HadoopUGIUtil, OptionRequirements}
2323
import org.apache.spark.deploy.kubernetes.config._
2424
import org.apache.spark.internal.Logging
2525

@@ -39,6 +39,7 @@ private[spark] class HadoopStepsOrchestrator(
3939
private val maybeExistingSecretItemKey =
4040
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
4141
private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
42+
private val hadoopUGI = new HadoopUGIUtil
4243
logInfo(s"Hadoop Conf directory: $hadoopConfDir")
4344

4445
require(maybeKeytab.forall( _ => isKerberosEnabled ),
@@ -64,7 +65,8 @@ private[spark] class HadoopStepsOrchestrator(
6465
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
6566
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
6667
hadoopConfigMapName,
67-
hadoopConfigurationFiles)
68+
hadoopConfigurationFiles,
69+
hadoopUGI)
6870
val hadoopConfMounterStep = new HadoopConfMounterStep(
6971
hadoopConfigMapName,
7072
hadoopConfigurationFiles,
@@ -79,7 +81,8 @@ private[spark] class HadoopStepsOrchestrator(
7981
new HadoopKerberosKeytabResolverStep(
8082
submissionSparkConf,
8183
maybePrincipal,
82-
maybeKeytab)))
84+
maybeKeytab,
85+
hadoopUGI)))
8386
} else {
8487
Option.empty[HadoopConfigurationStep]
8588
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
8282
val hadoopBootStrap = for {
8383
hadoopConfigMap <- maybeHadoopConfigMap
8484
} yield {
85+
val hadoopUtil = new HadoopUGIUtil
8586
val hadoopConfigurations = maybeHadoopConfDir.map(
8687
conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File])
8788
new HadoopConfBootstrapImpl(
8889
hadoopConfigMap,
89-
hadoopConfigurations
90+
hadoopConfigurations,
91+
hadoopUtil
9092
)
9193
}
9294
val kerberosBootstrap = for {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/HadoopConfBootstrapSuite.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,34 @@ import scala.collection.JavaConverters._
2323

2424
import com.google.common.io.Files
2525
import io.fabric8.kubernetes.api.model._
26+
import org.mockito.{Mock, MockitoAnnotations}
27+
import org.mockito.Mockito.when
28+
import org.scalatest.BeforeAndAfter
2629

2730
import org.apache.spark.SparkFunSuite
28-
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, PodWithMainContainer}
31+
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, HadoopUGIUtil, PodWithMainContainer}
2932
import org.apache.spark.deploy.kubernetes.constants._
3033
import org.apache.spark.util.Utils
3134

32-
33-
private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite {
35+
private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{
3436
private val CONFIG_MAP_NAME = "config-map"
3537
private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
3638
private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
39+
private val SPARK_USER_VALUE = "sparkUser"
40+
41+
@Mock
42+
private var hadoopUtil: HadoopUGIUtil = _
43+
44+
before {
45+
MockitoAnnotations.initMocks(this)
46+
when(hadoopUtil.getShortName).thenReturn(SPARK_USER_VALUE)
47+
}
3748

3849
test("Test of bootstrapping hadoop_conf_dir files") {
3950
val hadoopConfStep = new HadoopConfBootstrapImpl(
4051
CONFIG_MAP_NAME,
41-
HADOOP_FILES)
52+
HADOOP_FILES,
53+
hadoopUtil)
4254
val expectedKeyPaths = Seq(
4355
new KeyToPathBuilder()
4456
.withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)
@@ -55,6 +67,7 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite {
5567
.endVolume()
5668
.endSpec()
5769
.build()
70+
5871
val podWithMain = PodWithMainContainer(
5972
new PodBuilder().withNewSpec().endSpec().build(),
6073
new Container())
@@ -64,7 +77,8 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite {
6477
(vm.getName, vm.getMountPath)).head === (HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH))
6578
assert(returnedPodContainer.mainContainer.getEnv.asScala.head ===
6679
new EnvVarBuilder().withName(ENV_HADOOP_CONF_DIR).withValue(HADOOP_CONF_DIR_PATH).build())
67-
assert(returnedPodContainer.mainContainer.getEnv.asScala(1).getName === ENV_SPARK_USER)
80+
assert(returnedPodContainer.mainContainer.getEnv.asScala(1) ===
81+
new EnvVarBuilder().withName(ENV_SPARK_USER).withValue(SPARK_USER_VALUE).build())
6882
}
6983
private def createTempFile(contents: String): File = {
7084
val dir = Utils.createTempDir()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,73 @@ import scala.collection.JavaConverters._
2323

2424
import com.google.common.io.Files
2525
import io.fabric8.kubernetes.api.model._
26+
import org.apache.hadoop.fs.FileSystem
27+
import org.apache.hadoop.io.Text
28+
import org.apache.hadoop.security.Credentials
29+
import org.apache.hadoop.security.UserGroupInformation
30+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
31+
import org.apache.hadoop.security.token.Token
32+
import org.mockito.{Mock, MockitoAnnotations}
33+
import org.mockito.Matchers.any
34+
import org.mockito.Mockito.when
35+
import org.mockito.invocation.InvocationOnMock
36+
import org.mockito.stubbing.Answer
37+
import org.scalatest.BeforeAndAfter
2638

2739
import org.apache.spark.{SparkConf, SparkFunSuite}
40+
import org.apache.spark.deploy.SparkHadoopUtil
41+
import org.apache.spark.deploy.kubernetes.HadoopUGIUtil
2842
import org.apache.spark.deploy.kubernetes.constants._
2943
import org.apache.spark.util.Utils
3044

31-
32-
33-
private[spark] class HadoopKerberosKeytabResolverStepSuite extends SparkFunSuite {
45+
private[spark] class HadoopKerberosKeytabResolverStepSuite
46+
extends SparkFunSuite with BeforeAndAfter{
3447
private val POD_LABEL = Map("bootstrap" -> "true")
3548
private val DRIVER_CONTAINER_NAME = "driver-container"
3649
private val TEMP_KEYTAB_FILE = createTempFile("keytab")
3750
private val KERB_PRINCIPAL = "[email protected]"
51+
private val SPARK_USER_VALUE = "sparkUser"
52+
private var oldCredentials = new Credentials()
53+
private val TEST_IDENTIFIER = "identifier"
54+
private val TEST_PASSWORD = "password"
55+
private val TEST_TOKEN_VALUE = "data"
56+
private def getByteArray(input: String) = input.toCharArray.map(_.toByte)
57+
private def getStringFromArray(input: Array[Byte]) = new String(input)
58+
private val TEST_TOKEN = new Token[AbstractDelegationTokenIdentifier](
59+
getByteArray(TEST_IDENTIFIER),
60+
getByteArray(TEST_PASSWORD),
61+
new Text("kind"),
62+
new Text("service"))
63+
oldCredentials.addToken(new Text("testToken"), TEST_TOKEN)
64+
private val dfs = FileSystem.get(SparkHadoopUtil.get.newConfiguration(new SparkConf()))
65+
private val hadoopUGI = new HadoopUGIUtil()
66+
67+
@Mock
68+
private var hadoopUtil: HadoopUGIUtil = _
69+
70+
@Mock
71+
private var ugi: UserGroupInformation = _
72+
73+
before {
74+
MockitoAnnotations.initMocks(this)
75+
when(hadoopUtil.loginUserFromKeytabAndReturnUGI(any[String], any[String]))
76+
.thenAnswer(new Answer[UserGroupInformation] {
77+
override def answer(invocation: InvocationOnMock): UserGroupInformation = {
78+
hadoopUGI.getCurrentUser
79+
}
80+
})
81+
when(hadoopUtil.getCurrentUser).thenReturn(ugi)
82+
when(hadoopUtil.getShortName).thenReturn(SPARK_USER_VALUE)
83+
when(ugi.getCredentials).thenReturn(oldCredentials)
84+
}
3885

39-
// TODO: Require mocking of UGI methods
4086
test("Testing keytab login") {
87+
when(hadoopUtil.isSecurityEnabled).thenReturn(true)
4188
val keytabStep = new HadoopKerberosKeytabResolverStep(
4289
new SparkConf(),
4390
Some(KERB_PRINCIPAL),
44-
Some(TEMP_KEYTAB_FILE))
91+
Some(TEMP_KEYTAB_FILE),
92+
hadoopUtil)
4593
val hadoopConfSpec = HadoopConfigSpec(
4694
Map.empty[String, String],
4795
new PodBuilder()

0 commit comments

Comments
 (0)