Skip to content

Commit 68dde34

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-23781][CORE] Merge token renewer functionality into HadoopDelegationTokenManager.
This avoids having two classes to deal with tokens; now the above class is a one-stop shop for dealing with delegation tokens. The YARN backend extends that class instead of doing composition like before, resulting in a bit less code there too. The renewer functionality is basically the same code that used to be in YARN's AMCredentialRenewer. That is also the reason why the public API of HadoopDelegationTokenManager is a little bit odd; the YARN AM has some odd requirements for how this all should be initialized, and the weirdness is needed currently to support that. Tested: - YARN with stress app for DT renewal - Mesos and K8S with basic kerberos tests (both tgt and keytab) Closes apache#22624 from vanzin/SPARK-23781. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent af3b816 commit 68dde34

File tree

19 files changed

+355
-624
lines changed

19 files changed

+355
-624
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,9 @@ private[spark] object SparkConf extends Logging {
731731
KEYTAB.key -> Seq(
732732
AlternateConfig("spark.yarn.keytab", "3.0")),
733733
PRINCIPAL.key -> Seq(
734-
AlternateConfig("spark.yarn.principal", "3.0"))
734+
AlternateConfig("spark.yarn.principal", "3.0")),
735+
KERBEROS_RELOGIN_PERIOD.key -> Seq(
736+
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
735737
)
736738

737739
/**

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -413,20 +413,6 @@ object SparkHadoopUtil {
413413

414414
def get: SparkHadoopUtil = instance
415415

416-
/**
417-
* Given an expiration date for the current set of credentials, calculate the time when new
418-
* credentials should be created.
419-
*
420-
* @param expirationDate Drop-dead expiration date
421-
* @param conf Spark configuration
422-
* @return Timestamp when new credentials should be created.
423-
*/
424-
private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: SparkConf): Long = {
425-
val ct = System.currentTimeMillis
426-
val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
427-
(ct + (ratio * (expirationDate - ct))).toLong
428-
}
429-
430416
/**
431417
* Returns a Configuration object with Spark configuration applied on top. Unlike
432418
* the instance method, this will always return a Configuration instance, and not a

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 216 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,76 +17,158 @@
1717

1818
package org.apache.spark.deploy.security
1919

20+
import java.io.File
21+
import java.security.PrivilegedExceptionAction
22+
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
23+
import java.util.concurrent.atomic.AtomicReference
24+
2025
import org.apache.hadoop.conf.Configuration
2126
import org.apache.hadoop.fs.FileSystem
22-
import org.apache.hadoop.security.Credentials
27+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2328

2429
import org.apache.spark.SparkConf
30+
import org.apache.spark.deploy.SparkHadoopUtil
2531
import org.apache.spark.internal.Logging
32+
import org.apache.spark.internal.config._
33+
import org.apache.spark.rpc.RpcEndpointRef
34+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
35+
import org.apache.spark.ui.UIUtils
36+
import org.apache.spark.util.ThreadUtils
2637

2738
/**
28-
* Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
29-
* obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
30-
* [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
31-
* explicitly disabled.
39+
* Manager for delegation tokens in a Spark application.
40+
*
41+
* This manager has two modes of operation:
42+
*
43+
* 1. When configured with a principal and a keytab, it will make sure long-running apps can run
44+
* without interruption while accessing secured services. It periodically logs in to the KDC with
45+
* user-provided credentials, and contacts all the configured secure services to obtain delegation
46+
* tokens to be distributed to the rest of the application.
47+
*
48+
* Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
49+
* to check that a relogin is necessary. This is done reasonably often since the check is a no-op
50+
* when the relogin is not yet needed. The check period can be overridden in the configuration.
3251
*
33-
* Also, each HadoopDelegationTokenProvider is controlled by
34-
* spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
35-
* false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
36-
* enabled/disabled by the configuration spark.security.credentials.hive.enabled.
52+
* New delegation tokens are created once 75% of the renewal interval of the original tokens has
53+
* elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
54+
* The driver is tasked with distributing the tokens to other processes that might need them.
3755
*
38-
* @param sparkConf Spark configuration
39-
* @param hadoopConf Hadoop configuration
40-
* @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
56+
* 2. When operating without an explicit principal and keytab, token renewal will not be available.
57+
* Starting the manager will distribute an initial set of delegation tokens to the provided Spark
58+
* driver, but the app will not get new tokens when those expire.
59+
*
60+
* It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
61+
* method. This option does not require calling the `start` method, but leaves it up to the
62+
* caller to distribute the tokens that were generated.
4163
*/
4264
private[spark] class HadoopDelegationTokenManager(
43-
sparkConf: SparkConf,
44-
hadoopConf: Configuration,
45-
fileSystems: Configuration => Set[FileSystem])
46-
extends Logging {
65+
protected val sparkConf: SparkConf,
66+
protected val hadoopConf: Configuration) extends Logging {
4767

4868
private val deprecatedProviderEnabledConfigs = List(
4969
"spark.yarn.security.tokens.%s.enabled",
5070
"spark.yarn.security.credentials.%s.enabled")
5171
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
5272

53-
// Maintain all the registered delegation token providers
54-
private val delegationTokenProviders = getDelegationTokenProviders
73+
private val principal = sparkConf.get(PRINCIPAL).orNull
74+
private val keytab = sparkConf.get(KEYTAB).orNull
75+
76+
require((principal == null) == (keytab == null),
77+
"Both principal and keytab must be defined, or neither.")
78+
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
79+
80+
private val delegationTokenProviders = loadProviders()
5581
logDebug("Using the following builtin delegation token providers: " +
5682
s"${delegationTokenProviders.keys.mkString(", ")}.")
5783

58-
/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
59-
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
60-
this(
61-
sparkConf,
62-
hadoopConf,
63-
hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
84+
private var renewalExecutor: ScheduledExecutorService = _
85+
private val driverRef = new AtomicReference[RpcEndpointRef]()
86+
87+
/** Set the endpoint used to send tokens to the driver. */
88+
def setDriverRef(ref: RpcEndpointRef): Unit = {
89+
driverRef.set(ref)
6490
}
6591

66-
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
67-
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
68-
safeCreateProvider(new HiveDelegationTokenProvider) ++
69-
safeCreateProvider(new HBaseDelegationTokenProvider)
92+
/** @return Whether delegation token renewal is enabled. */
93+
def renewalEnabled: Boolean = principal != null
7094

71-
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
72-
providers
73-
.filter { p => isServiceEnabled(p.serviceName) }
74-
.map { p => (p.serviceName, p) }
75-
.toMap
95+
/**
96+
* Start the token renewer. Requires a principal and keytab. Upon start, the renewer will:
97+
*
98+
* - log in the configured principal, and set up a task to keep that user's ticket renewed
99+
* - obtain delegation tokens from all available providers
100+
* - send the tokens to the driver, if it's already registered
101+
* - schedule a periodic task to update the tokens when needed.
102+
*
103+
* @return The newly logged in user.
104+
*/
105+
def start(): UserGroupInformation = {
106+
require(renewalEnabled, "Token renewal must be enabled to start the renewer.")
107+
renewalExecutor =
108+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
109+
110+
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
111+
val ugi = doLogin()
112+
113+
val tgtRenewalTask = new Runnable() {
114+
override def run(): Unit = {
115+
ugi.checkTGTAndReloginFromKeytab()
116+
}
117+
}
118+
val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
119+
renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
120+
TimeUnit.SECONDS)
121+
122+
val creds = obtainTokensAndScheduleRenewal(ugi)
123+
ugi.addCredentials(creds)
124+
125+
val driver = driverRef.get()
126+
if (driver != null) {
127+
val tokens = SparkHadoopUtil.get.serialize(creds)
128+
driver.send(UpdateDelegationTokens(tokens))
129+
}
130+
131+
// Transfer the original user's tokens to the new user, since it may contain needed tokens
132+
// (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
133+
// exist in the current user's credentials, since those were freshly obtained above
134+
// (see SPARK-23361).
135+
val existing = ugi.getCredentials()
136+
existing.mergeAll(originalCreds)
137+
ugi.addCredentials(existing)
138+
ugi
76139
}
77140

78-
private def safeCreateProvider(
79-
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
80-
try {
81-
Some(createFn)
82-
} catch {
83-
case t: Throwable =>
84-
logDebug(s"Failed to load built in provider.", t)
85-
None
141+
def stop(): Unit = {
142+
if (renewalExecutor != null) {
143+
renewalExecutor.shutdown()
86144
}
87145
}
88146

89-
def isServiceEnabled(serviceName: String): Boolean = {
147+
/**
148+
* Fetch new delegation tokens for configured services, storing them in the given credentials.
149+
* Tokens are fetched for the current logged in user.
150+
*
151+
* @param creds Credentials object where to store the delegation tokens.
152+
* @return The time by which the tokens must be renewed.
153+
*/
154+
def obtainDelegationTokens(creds: Credentials): Long = {
155+
delegationTokenProviders.values.flatMap { provider =>
156+
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
157+
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
158+
} else {
159+
logDebug(s"Service ${provider.serviceName} does not require a token." +
160+
s" Check your configuration to see if security is disabled or not.")
161+
None
162+
}
163+
}.foldLeft(Long.MaxValue)(math.min)
164+
}
165+
166+
// Visible for testing.
167+
def isProviderLoaded(serviceName: String): Boolean = {
168+
delegationTokenProviders.contains(serviceName)
169+
}
170+
171+
protected def isServiceEnabled(serviceName: String): Boolean = {
90172
val key = providerEnabledConfig.format(serviceName)
91173

92174
deprecatedProviderEnabledConfigs.foreach { pattern =>
@@ -110,32 +192,104 @@ private[spark] class HadoopDelegationTokenManager(
110192
}
111193

112194
/**
113-
* Get delegation token provider for the specified service.
195+
* List of file systems for which to obtain delegation tokens. The base implementation
196+
* returns just the default file system in the given Hadoop configuration.
114197
*/
115-
def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
116-
delegationTokenProviders.get(service)
198+
protected def fileSystemsToAccess(): Set[FileSystem] = {
199+
Set(FileSystem.get(hadoopConf))
200+
}
201+
202+
private def scheduleRenewal(delay: Long): Unit = {
203+
val _delay = math.max(0, delay)
204+
logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
205+
206+
val renewalTask = new Runnable() {
207+
override def run(): Unit = {
208+
updateTokensTask()
209+
}
210+
}
211+
renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
117212
}
118213

119214
/**
120-
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
121-
* providers.
122-
*
123-
* @param hadoopConf hadoop Configuration
124-
* @param creds Credentials that will be updated in place (overwritten)
125-
* @return Time after which the fetched delegation tokens should be renewed.
215+
* Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
216+
* to fetch the next set of tokens when needed.
126217
*/
127-
def obtainDelegationTokens(
128-
hadoopConf: Configuration,
129-
creds: Credentials): Long = {
130-
delegationTokenProviders.values.flatMap { provider =>
131-
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
132-
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
218+
private def updateTokensTask(): Unit = {
219+
try {
220+
val freshUGI = doLogin()
221+
val creds = obtainTokensAndScheduleRenewal(freshUGI)
222+
val tokens = SparkHadoopUtil.get.serialize(creds)
223+
224+
val driver = driverRef.get()
225+
if (driver != null) {
226+
logInfo("Updating delegation tokens.")
227+
driver.send(UpdateDelegationTokens(tokens))
133228
} else {
134-
logDebug(s"Service ${provider.serviceName} does not require a token." +
135-
s" Check your configuration to see if security is disabled or not.")
136-
None
229+
// This shouldn't really happen, since the driver should register way before tokens expire.
230+
logWarning("Delegation tokens close to expiration but no driver has registered yet.")
231+
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
137232
}
138-
}.foldLeft(Long.MaxValue)(math.min)
233+
} catch {
234+
case e: Exception =>
235+
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
236+
logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
237+
" If this happens too often tasks will fail.", e)
238+
scheduleRenewal(delay)
239+
}
240+
}
241+
242+
/**
243+
* Obtain new delegation tokens from the available providers. Schedules a new task to fetch
244+
* new tokens before the new set expires.
245+
*
246+
* @return Credentials containing the new tokens.
247+
*/
248+
private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
249+
ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
250+
override def run(): Credentials = {
251+
val creds = new Credentials()
252+
val nextRenewal = obtainDelegationTokens(creds)
253+
254+
// Calculate the time when new credentials should be created, based on the configured
255+
// ratio.
256+
val now = System.currentTimeMillis
257+
val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
258+
val delay = (ratio * (nextRenewal - now)).toLong
259+
scheduleRenewal(delay)
260+
creds
261+
}
262+
})
263+
}
264+
265+
private def doLogin(): UserGroupInformation = {
266+
logInfo(s"Attempting to login to KDC using principal: $principal")
267+
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
268+
logInfo("Successfully logged into KDC.")
269+
ugi
270+
}
271+
272+
private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
273+
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
274+
safeCreateProvider(new HiveDelegationTokenProvider) ++
275+
safeCreateProvider(new HBaseDelegationTokenProvider)
276+
277+
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
278+
providers
279+
.filter { p => isServiceEnabled(p.serviceName) }
280+
.map { p => (p.serviceName, p) }
281+
.toMap
139282
}
140-
}
141283

284+
private def safeCreateProvider(
285+
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
286+
try {
287+
Some(createFn)
288+
} catch {
289+
case t: Throwable =>
290+
logDebug(s"Failed to load built in provider.", t)
291+
None
292+
}
293+
}
294+
295+
}

core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkException}
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.internal.config._
3232

33-
private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem])
33+
private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem])
3434
extends HadoopDelegationTokenProvider with Logging {
3535

3636
// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
@@ -44,8 +44,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
4444
hadoopConf: Configuration,
4545
sparkConf: SparkConf,
4646
creds: Credentials): Option[Long] = {
47-
48-
val fsToGetTokens = fileSystems(hadoopConf)
47+
val fsToGetTokens = fileSystems()
4948
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds)
5049

5150
// Get the token renewal interval if it is not set. It will only be called once.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ package object config {
179179
.doc("Name of the Kerberos principal.")
180180
.stringConf.createOptional
181181

182+
private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.kerberos.relogin.period")
183+
.timeConf(TimeUnit.SECONDS)
184+
.createWithDefaultString("1m")
185+
182186
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
183187
.intConf
184188
.createOptional

0 commit comments

Comments
 (0)