Skip to content

Commit 486b991

Browse files
authored
KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475)
Basically, one of the refactor tasks. In this PR, I have moved `DelegationTokenPublisher` to the metadata module. Similar to the `ScramPublisher` migration (commit feee50f), I have moved `DelegationTokenManager` to the server-common module, as it would otherwise create a circular dependency. Moreover, I have made multiple changes throughout the codebase to reference `DelegationTokenManager` from server-common instead of the server module. Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 8036e49 commit 486b991

File tree

12 files changed

+97
-100
lines changed

12 files changed

+97
-100
lines changed

checkstyle/import-control-server-common.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
<allow pkg="javax.net.ssl" />
3434
<allow pkg="javax.security" />
3535
<allow pkg="net.jqwik.api" />
36+
<allow pkg="javax.crypto" />
3637

3738
<!-- no one depends on the server -->
3839
<disallow pkg="kafka" />
@@ -49,6 +50,9 @@
4950
<!-- persistent collection factories/non-library-specific wrappers -->
5051
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
5152

53+
<!-- allow config classes for server package -->
54+
<allow pkg="org.apache.kafka.server.config" />
55+
5256
<subpackage name="queue">
5357
<allow pkg="org.apache.kafka.test" />
5458
</subpackage>

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import org.apache.kafka.coordinator.share.ShareCoordinator;
3737
import org.apache.kafka.metadata.ConfigRepository;
3838
import org.apache.kafka.metadata.MetadataCache;
39+
import org.apache.kafka.security.DelegationTokenManager;
3940
import org.apache.kafka.server.ApiVersionManager;
4041
import org.apache.kafka.server.ClientMetricsManager;
41-
import org.apache.kafka.server.DelegationTokenManager;
4242
import org.apache.kafka.server.authorizer.Authorizer;
4343
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
4444

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
4242
import org.apache.kafka.coordinator.transaction.ProducerIdManager
4343
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
4444
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
45-
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
46-
import org.apache.kafka.security.CredentialProvider
45+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
46+
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
4747
import org.apache.kafka.server.authorizer.Authorizer
4848
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
4949
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
@@ -54,7 +54,7 @@ import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
5454
import org.apache.kafka.server.share.session.ShareSessionCache
5555
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
5656
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
57-
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
57+
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, ProcessRole}
5858
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
5959
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
6060
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -502,7 +502,7 @@ class BrokerServer(
502502
"broker",
503503
credentialProvider),
504504
new DelegationTokenPublisher(
505-
config,
505+
config.nodeId,
506506
sharedServer.metadataPublishingFaultHandler,
507507
"broker",
508508
tokenManager),

core/src/main/scala/kafka/server/ControllerApis.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
5555
import org.apache.kafka.common.security.auth.KafkaPrincipal
5656
import org.apache.kafka.common.security.auth.SecurityProtocol
5757
import org.apache.kafka.raft.RaftManager
58-
import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
58+
import org.apache.kafka.security.DelegationTokenManager
59+
import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
5960
import org.apache.kafka.server.authorizer.Authorizer
6061
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
6162
import org.apache.kafka.server.quota.ControllerMutationQuota

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
2222
import kafka.server.QuotaFactory.QuotaManagers
2323

2424
import scala.collection.immutable
25-
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
25+
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
2626
import kafka.utils.{CoreUtils, Logging}
2727
import org.apache.kafka.common.internals.Plugin
2828
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,14 +38,15 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
3838
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
3939
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
4040
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
41-
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
41+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
4242
import org.apache.kafka.raft.QuorumConfig
43-
import org.apache.kafka.security.CredentialProvider
44-
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
43+
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
44+
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
4545
import org.apache.kafka.server.authorizer.Authorizer
4646
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
4747
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
48-
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
48+
import org.apache.kafka.server.config.ConfigType
49+
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
4950
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
5051
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
5152
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -360,7 +361,7 @@ class ControllerServer(
360361
// We need a tokenManager for the Publisher
361362
// The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
362363
metadataPublishers.add(new DelegationTokenPublisher(
363-
config,
364+
config.nodeId,
364365
sharedServer.metadataPublishingFaultHandler,
365366
"controller",
366367
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
6060
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
6161
import org.apache.kafka.coordinator.share.ShareCoordinator
6262
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
63-
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
63+
import org.apache.kafka.security.DelegationTokenManager
64+
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, ProcessRole}
6465
import org.apache.kafka.server.authorizer._
6566
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
6667
import org.apache.kafka.server.config.DelegationTokenManagerConfigs

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3333
import org.apache.kafka.image.loader.LoaderManifest
3434
import org.apache.kafka.image.publisher.MetadataPublisher
3535
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
36-
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
36+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
3737
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
3838
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
3939
import org.apache.kafka.server.fault.FaultHandler
@@ -227,7 +227,7 @@ class BrokerMetadataPublisher(
227227
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
228228

229229
// Apply DelegationToken delta.
230-
delegationTokenPublisher.onMetadataUpdate(delta, newImage)
230+
delegationTokenPublisher.onMetadataUpdate(delta, newImage, manifest)
231231

232232
// Apply ACL delta.
233233
aclPublisher.onMetadataUpdate(delta, newImage, manifest)

core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala

Lines changed: 0 additions & 83 deletions
This file was deleted.

core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
4040
import org.apache.kafka.coordinator.share.ShareCoordinator
4141
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
4242
import org.apache.kafka.image.loader.LogDeltaManifest
43-
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
43+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
4444
import org.apache.kafka.network.SocketServerConfigs
4545
import org.apache.kafka.raft.LeaderAndEpoch
4646
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.metadata.publisher;
18+
19+
import org.apache.kafka.image.DelegationTokenImage;
20+
import org.apache.kafka.image.MetadataDelta;
21+
import org.apache.kafka.image.MetadataImage;
22+
import org.apache.kafka.image.loader.LoaderManifest;
23+
import org.apache.kafka.image.publisher.MetadataPublisher;
24+
import org.apache.kafka.security.DelegationTokenManager;
25+
import org.apache.kafka.server.fault.FaultHandler;
26+
27+
public class DelegationTokenPublisher implements MetadataPublisher {
28+
private final int nodeId;
29+
private final FaultHandler faultHandler;
30+
private final String nodeType;
31+
private final DelegationTokenManager tokenManager;
32+
private boolean firstPublish = true;
33+
34+
public DelegationTokenPublisher(int nodeId, FaultHandler faultHandler, String nodeType, DelegationTokenManager tokenManager) {
35+
this.nodeId = nodeId;
36+
this.faultHandler = faultHandler;
37+
this.nodeType = nodeType;
38+
this.tokenManager = tokenManager;
39+
}
40+
41+
@Override
42+
public final String name() {
43+
return "DelegationTokenPublisher " + nodeType + " id=" + nodeId;
44+
}
45+
46+
@Override
47+
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
48+
var first = firstPublish;
49+
try {
50+
if (firstPublish) {
51+
// Initialize the tokenCache with the Image
52+
DelegationTokenImage delegationTokenImage = newImage.delegationTokens();
53+
for (var token : delegationTokenImage.tokens().entrySet()) {
54+
tokenManager.updateToken(tokenManager.getDelegationToken(token.getValue().tokenInformation()));
55+
}
56+
firstPublish = false;
57+
}
58+
// Apply changes to DelegationTokens.
59+
for (var token : delta.getOrCreateDelegationTokenDelta().changes().entrySet()) {
60+
var tokenId = token.getKey();
61+
var delegationTokenData = token.getValue();
62+
if (delegationTokenData.isPresent())
63+
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()));
64+
else
65+
tokenManager.removeToken(tokenId);
66+
}
67+
} catch (Throwable t) {
68+
var msg = String.format("Uncaught exception while publishing DelegationToken changes from %s MetadataDelta up to %s",
69+
first ? "initial" : "update", newImage.highestOffsetAndEpoch().offset());
70+
faultHandler.handleFault(msg, t);
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)