17
17
package org .apache .spark .deploy .kubernetes .submit .submitsteps .hadoopsteps
18
18
19
19
import java .io ._
20
+ import java .security .PrivilegedExceptionAction
20
21
21
22
import scala .collection .JavaConverters ._
22
23
import scala .util .Try
@@ -30,6 +31,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
30
31
import org .apache .hadoop .security .token .delegation .AbstractDelegationTokenIdentifier
31
32
32
33
import org .apache .spark .SparkConf
34
+
33
35
import org .apache .spark .deploy .SparkHadoopUtil
34
36
import org .apache .spark .deploy .kubernetes .{KerberosConfBootstrapImpl , PodWithMainContainer }
35
37
import org .apache .spark .deploy .kubernetes .constants ._
@@ -44,9 +46,12 @@ private[spark] class HadoopKerberosKeytabResolverStep(
44
46
submissionSparkConf : SparkConf ,
45
47
maybePrincipal : Option [String ],
46
48
maybeKeytab : Option [File ]) extends HadoopConfigurationStep with Logging {
47
-
48
- override def configureContainers (hadoopConfigSpec : HadoopConfigSpec ): HadoopConfigSpec = {
49
- // FIXME: Pass down hadoopConf so you can call sc.hadoopConfiguration
49
+ private var originalCredentials : Credentials = _
50
+ private var dfs : FileSystem = _
51
+ private var renewer : String = _
52
+ private var renewedCredentials : Credentials = _
53
+ private var renewedTokens : Iterable [Token [_ <: TokenIdentifier ]] = _
54
+ override def configureContainers (hadoopConfigSpec : HadoopConfigSpec ): HadoopConfigSpec = {
50
55
val hadoopConf = SparkHadoopUtil .get.newConfiguration(submissionSparkConf)
51
56
logInfo(s " Hadoop Configuration: ${hadoopConf.toString}" )
52
57
if (! UserGroupInformation .isSecurityEnabled) logError(" Hadoop not configuration with Kerberos" )
@@ -66,26 +71,30 @@ private[spark] class HadoopKerberosKeytabResolverStep(
66
71
}
67
72
// In the case that keytab is not specified we will read from Local Ticket Cache
68
73
val jobUserUGI = maybeJobUserUGI.getOrElse(UserGroupInformation .getCurrentUser)
69
- logInfo(s " Retrieved Job User UGI: $jobUserUGI" )
70
- val originalCredentials : Credentials = jobUserUGI.getCredentials
71
- logInfo(s " Original tokens: ${originalCredentials.toString}" )
72
- logInfo(s " All tokens: ${originalCredentials.getAllTokens}" )
73
- logInfo(s " All secret keys: ${originalCredentials.getAllSecretKeys}" )
74
- val dfs : FileSystem = FileSystem .get(hadoopConf)
75
- // This is not necessary with [Spark-20328] since we would be using
76
- // Spark core providers to handle delegation token renewal
77
- val renewer : String = jobUserUGI.getShortUserName
78
- logInfo(s " Renewer is: $renewer" )
79
- val renewedCredentials : Credentials = new Credentials (originalCredentials)
80
- dfs.addDelegationTokens(renewer, renewedCredentials)
81
- val renewedTokens = renewedCredentials.getAllTokens.asScala
82
- logInfo(s " Renewed tokens: ${renewedCredentials.toString}" )
83
- logInfo(s " All renewed tokens: ${renewedTokens}" )
84
- logInfo(s " All renewed secret keys: ${renewedCredentials.getAllSecretKeys}" )
74
+ // It is necessary to run as jobUserUGI because logged in user != Current User
75
+ jobUserUGI.doAs(new PrivilegedExceptionAction [Void ] {
76
+ override def run (): Void = {
77
+ logInfo(s " Retrieved Job User UGI: $jobUserUGI" )
78
+ originalCredentials = jobUserUGI.getCredentials
79
+ logInfo(s " Original tokens: ${originalCredentials.toString}" )
80
+ logInfo(s " All tokens: ${originalCredentials.getAllTokens}" )
81
+ logInfo(s " All secret keys: ${originalCredentials.getAllSecretKeys}" )
82
+ dfs = FileSystem .get(hadoopConf)
83
+ // This is not necessary with [Spark-20328] since we would be using
84
+ // Spark core providers to handle delegation token renewal
85
+ renewer = jobUserUGI.getShortUserName
86
+ logInfo(s " Renewer is: $renewer" )
87
+ renewedCredentials = new Credentials (originalCredentials)
88
+ dfs.addDelegationTokens(renewer, renewedCredentials)
89
+ renewedTokens = renewedCredentials.getAllTokens.asScala
90
+ logInfo(s " Renewed tokens: ${renewedCredentials.toString}" )
91
+ logInfo(s " All renewed tokens: ${renewedTokens.mkString(" ," )}" )
92
+ logInfo(s " All renewed secret keys: ${renewedCredentials.getAllSecretKeys}" )
93
+ null
94
+ }})
85
95
if (renewedTokens.isEmpty) logError(" Did not obtain any Delegation Tokens" )
86
96
val data = serialize(renewedCredentials)
87
- val renewalTime = getTokenRenewalInterval(renewedTokens, hadoopConf)
88
- .getOrElse(Long .MaxValue )
97
+ val renewalTime = getTokenRenewalInterval(renewedTokens, hadoopConf).getOrElse(Long .MaxValue )
89
98
val delegationToken = HDFSDelegationToken (data, renewalTime)
90
99
val initialTokenLabelName = s " $KERBEROS_SECRET_LABEL_PREFIX-1- $renewalTime"
91
100
logInfo(s " Storing dt in $initialTokenLabelName" )
0 commit comments