-
Notifications
You must be signed in to change notification settings - Fork 117
Implements a server for refreshing HDFS tokens, a part of secure HDFS support. #453
base: branch-2.2-kubernetes
Are you sure you want to change the base?
Changes from 2 commits
7abb5ee
88c0c03
ca0b583
025e2ba
0a7a15d
b3534f1
cbe2777
4f36793
874b8e9
5dc49ca
388063a
1426523
50c3a66
c2ccaa9
a2aec2b
ec70b47
0b049fd
d42c568
5d96879
c0e28d4
57c847e
ce1bb7f
5162339
196cd8a
56ef8e6
93b2acf
ba2e79a
1d74579
95e68d3
a006233
9dc8345
f4d5ee9
1462d2c
193d0f9
eb10c4a
078ac2a
aeb269a
87dedbc
4d1cb74
8997f04
2ed55af
01be03e
0baaf0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,7 +50,7 @@ private class TokenRefreshService(kubernetesClient: KubernetesClient) extends Ac | |
case Relogin => | ||
launchReloginTask() | ||
case StartRefresh(secret) => | ||
addStarterTask(secret) | ||
startRefresh(secret) | ||
case StopRefresh(secret) => | ||
removeRefreshTask(secret) | ||
case UpdateSecretsToTrack(secrets) => | ||
|
@@ -69,7 +69,12 @@ private class TokenRefreshService(kubernetesClient: KubernetesClient) extends Ac | |
scheduler.scheduleOnce(Duration(0L, TimeUnit.MILLISECONDS), task) | ||
} | ||
|
||
private def addStarterTask(secret: Secret) = { | ||
private def startRefresh(secret: Secret) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
recentlyAddedSecretUids.add(getSecretUid(secret.getMetadata)) | ||
addRefreshTask(secret) | ||
} | ||
|
||
private def addRefreshTask(secret: Secret) = { | ||
secretUidToTaskHandle.getOrElseUpdate(getSecretUid(secret.getMetadata), { | ||
val task = new StarterTask(secret, hadoopConf, self, clock) | ||
val cancellable = scheduler.scheduleOnce( | ||
|
@@ -89,13 +94,14 @@ private class TokenRefreshService(kubernetesClient: KubernetesClient) extends Ac | |
} | ||
|
||
private def updateSecretsToTrack(currentSecrets: List[Secret]) : Unit = { | ||
val secretByUid = currentSecrets.map(secret => (getSecretUid(secret.getMetadata), secret)).toMap | ||
val currentUids = secretByUid.keySet | ||
val secretsByUids = currentSecrets.map(secret => (getSecretUid(secret.getMetadata), secret)) | ||
.toMap | ||
val currentUids = secretsByUids.keySet | ||
val priorUids = secretUidToTaskHandle.keySet | ||
val uidsToAdd = currentUids -- priorUids | ||
uidsToAdd.foreach(uid => addStarterTask(secretByUid(uid))) | ||
uidsToAdd.foreach(uid => addRefreshTask(secretsByUids(uid))) | ||
val uidsToRemove = priorUids -- currentUids -- recentlyAddedSecretUids | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where uids get added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. It seems I forgot to add to this. I'll fix the bug. It will be added whenever we get a StartRefresh command. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
uidsToRemove.foreach(uid => removeRefreshTask(secretByUid(uid))) | ||
uidsToRemove.foreach(uid => removeRefreshTask(secretsByUids(uid))) | ||
recentlyAddedSecretUids.clear() | ||
} | ||
|
||
|
@@ -216,13 +222,16 @@ private class RenewTask(renew: Renew, | |
private def refresh(token: Token[_ <: TokenIdentifier], expireTime: Long, deadline: Long, | ||
nowMillis: Long) = { | ||
val maybeNewToken = maybeObtainNewToken(token, expireTime, nowMillis) | ||
val maybeNewExpireTime = maybeRenewExpireTime(maybeNewToken, expireTime, deadline, nowMillis) | ||
val maybeNewExpireTime = maybeGetNewExpireTime(maybeNewToken, expireTime, deadline, nowMillis) | ||
(maybeNewToken, maybeNewExpireTime) | ||
} | ||
|
||
private def maybeObtainNewToken(token: Token[_ <: TokenIdentifier], expireTime: Long, | ||
nowMills: Long) = { | ||
val maybeNewToken = if (token.getKind.equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this check against There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The line below casts the token identifier to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. |
||
// The token can casted to AbstractDelegationTokenIdentifier below only if the token kind | ||
// is HDFS_DELEGATION_KIND, according to the YARN resource manager code. See if this can be | ||
// generalized beyond HDFS tokens. | ||
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
val maxDate = identifier.getMaxDate | ||
if (maxDate - expireTime < RENEW_TASK_REMAINING_TIME_BEFORE_NEW_TOKEN_MILLIS || | ||
|
@@ -241,8 +250,9 @@ private class RenewTask(renew: Renew, | |
maybeNewToken | ||
} | ||
|
||
private def maybeRenewExpireTime(token: Token[_ <: TokenIdentifier], expireTime: Long, | ||
deadline: Long, nowMillis: Long) = { | ||
private def maybeGetNewExpireTime(token: Token[_ <: TokenIdentifier], expireTime: Long, | ||
deadline: Long, | ||
nowMillis: Long) = { | ||
if (expireTime <= deadline || expireTime <= nowMillis) { | ||
try { | ||
logDebug(s"Renewing token $token with current expire time $expireTime," + | ||
|
@@ -301,7 +311,9 @@ private class RenewTask(renew: Renew, | |
editor.addToData(key, value) | ||
val dataItemKeys = editor.getData.keySet().asScala.filter( | ||
_.startsWith(SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS)).toSeq.sorted | ||
// Remove data items except the latest two data items. | ||
// Remove data items except the latest two data items. A K8s secret can hold only up to 1 MB | ||
// data. We need to remove old data items. We keep the latest two items to avoid race conditions | ||
// where some newly launching executors may access the previous token. | ||
dataItemKeys.dropRight(2).foreach(editor.removeFromData) | ||
editor.done | ||
logInfo(s"Wrote new tokens $tokenToExpire to a data item $key in ${secretMeta.getSelfLink}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accessing/that access
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.