17
17
package org .apache .spark .deploy .kubernetes .submit .submitsteps .hadoopsteps
18
18
19
19
import java .io ._
20
- import java .security .PrivilegedExceptionAction
20
+
21
+ import scala .collection .JavaConverters ._
22
+ import scala .util .Try
21
23
22
24
import io .fabric8 .kubernetes .api .model .SecretBuilder
23
25
import org .apache .commons .codec .binary .Base64
24
26
import org .apache .hadoop .conf .Configuration
27
+ import org .apache .hadoop .fs .FileSystem
25
28
import org .apache .hadoop .security .{Credentials , UserGroupInformation }
29
+ import org .apache .hadoop .security .token .{Token , TokenIdentifier }
30
+ import org .apache .hadoop .security .token .delegation .AbstractDelegationTokenIdentifier
26
31
27
32
import org .apache .spark .SparkConf
28
33
import org .apache .spark .deploy .SparkHadoopUtil
29
34
import org .apache .spark .deploy .kubernetes .{KerberosConfBootstrapImpl , PodWithMainContainer }
30
35
import org .apache .spark .deploy .kubernetes .constants ._
31
36
import org .apache .spark .internal .Logging
32
37
38
+
39
+
33
40
/**
34
41
* Step that configures the ConfigMap + Volumes for the driver
35
42
*/
@@ -59,20 +66,26 @@ private[spark] class HadoopKerberosKeytabResolverStep(
59
66
}
60
67
// In the case that keytab is not specified we will read from Local Ticket Cache
61
68
val jobUserUGI = maybeJobUserUGI.getOrElse(UserGroupInformation .getCurrentUser)
62
- logInfo(s " Primary group name: ${jobUserUGI.getPrimaryGroupName}" )
63
- val credentials : Credentials = jobUserUGI.getCredentials
64
- val credentialsManager = newHadoopTokenManager(submissionSparkConf, hadoopConf)
65
- var renewalTime = Long .MaxValue
66
- jobUserUGI.doAs(new PrivilegedExceptionAction [Void ] {
67
- override def run (): Void = {
68
- renewalTime = Math .min(
69
- obtainCredentials(credentialsManager, hadoopConf, credentials),
70
- renewalTime)
71
- null
72
- }
73
- })
74
- if (credentials.getAllTokens.isEmpty) logError(" Did not obtain any Delegation Tokens" )
75
- val data = serialize(credentials)
69
+ logInfo(s " Retrieved Job User UGI: $jobUserUGI" )
70
+ val originalCredentials : Credentials = jobUserUGI.getCredentials
71
+ logInfo(s " Original tokens: ${originalCredentials.toString}" )
72
+ logInfo(s " All tokens: ${originalCredentials.getAllTokens}" )
73
+ logInfo(s " All secret keys: ${originalCredentials.getAllSecretKeys}" )
74
+ val dfs : FileSystem = FileSystem .get(hadoopConf)
75
+ // This is not necessary with [Spark-20328] since we would be using
76
+ // Spark core providers to handle delegation token renewal
77
+ val renewer : String = jobUserUGI.getShortUserName
78
+ logInfo(s " Renewer is: $renewer" )
79
+ val renewedCredentials : Credentials = new Credentials (originalCredentials)
80
+ dfs.addDelegationTokens(renewer, renewedCredentials)
81
+ val renewedTokens = renewedCredentials.getAllTokens.asScala
82
+ logInfo(s " Renewed tokens: ${renewedCredentials.toString}" )
83
+ logInfo(s " All renewed tokens: ${renewedTokens}" )
84
+ logInfo(s " All renewed secret keys: ${renewedCredentials.getAllSecretKeys}" )
85
+ if (renewedTokens.isEmpty) logError(" Did not obtain any Delegation Tokens" )
86
+ val data = serialize(renewedCredentials)
87
+ val renewalTime = getTokenRenewalInterval(renewedTokens, hadoopConf)
88
+ .getOrElse(Long .MaxValue )
76
89
val delegationToken = HDFSDelegationToken (data, renewalTime)
77
90
val initialTokenLabelName = s " $KERBEROS_SECRET_LABEL_PREFIX-1- $renewalTime"
78
91
logInfo(s " Storing dt in $initialTokenLabelName" )
@@ -97,24 +110,24 @@ private[spark] class HadoopKerberosKeytabResolverStep(
97
110
dtSecret = Some (secretDT))
98
111
}
99
112
100
- // Functions that should be in SparkHadoopUtil with Rebase to 2.2
113
+ // Functions that should be in Core with Rebase to 2.3
101
114
@ deprecated(" Moved to core in 2.2" , " 2.2" )
102
- private def obtainCredentials (instance : Any , args : AnyRef * ): Long = {
103
- val method = Class
104
- .forName(" org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager" )
105
- .getMethod(" obtainCredentials" , classOf [Configuration ], classOf [Configuration ])
106
- method.setAccessible(true )
107
- method.invoke(instance, args : _* ).asInstanceOf [Long ]
115
+ private def getTokenRenewalInterval (
116
+ renewedTokens : Iterable [Token [_ <: TokenIdentifier ]],
117
+ hadoopConf : Configuration ): Option [Long ] = {
118
+ val renewIntervals = renewedTokens.filter {
119
+ _.decodeIdentifier().isInstanceOf [AbstractDelegationTokenIdentifier ]}
120
+ .flatMap { token =>
121
+ Try {
122
+ val newExpiration = token.renew(hadoopConf)
123
+ val identifier = token.decodeIdentifier().asInstanceOf [AbstractDelegationTokenIdentifier ]
124
+ val interval = newExpiration - identifier.getIssueDate
125
+ logInfo(s " Renewal interval is $interval for token ${token.getKind.toString}" )
126
+ interval
127
+ }.toOption}
128
+ if (renewIntervals.isEmpty) None else Some (renewIntervals.min)
108
129
}
109
- @ deprecated(" Moved to core in 2.2" , " 2.2" )
110
- // This method will instead be using HadoopDelegationTokenManager from Spark 2.2
111
- private def newHadoopTokenManager (args : AnyRef * ): Any = {
112
- val constructor = Class
113
- .forName(" org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager" )
114
- .getConstructor(classOf [SparkConf ], classOf [Configuration ])
115
- constructor.setAccessible(true )
116
- constructor.newInstance(args : _* )
117
- }
130
+
118
131
@ deprecated(" Moved to core in 2.2" , " 2.2" )
119
132
private def serialize (creds : Credentials ): Array [Byte ] = {
120
133
val byteStream = new ByteArrayOutputStream
0 commit comments