Skip to content

Commit a244565

Browse files
authored
KAFKA-18708: Move ScramPublisher to metadata module (apache#20468)
Reviewers: Mickael Maison <[email protected]>
1 parent 32b8e32 commit a244565

File tree

9 files changed

+78
-78
lines changed

9 files changed

+78
-78
lines changed

checkstyle/import-control-metadata.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
<allow pkg="org.apache.kafka.metadata" />
162162
<allow pkg="org.apache.kafka.queue" />
163163
<allow pkg="org.apache.kafka.raft" />
164+
<allow pkg="org.apache.kafka.security" />
164165
<allow pkg="org.apache.kafka.server.authorizer" />
165166
<allow pkg="org.apache.kafka.server.common" />
166167
<allow pkg="org.apache.kafka.server.fault" />

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
</subpackage>
6161

6262
<subpackage name="security">
63+
<allow pkg="org.apache.kafka.clients.admin" />
6364
<allow pkg="org.apache.kafka.common.config" />
6465
<allow pkg="org.apache.kafka.common.config.types" />
6566
<allow pkg="org.apache.kafka.server.util" />

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
4242
import org.apache.kafka.coordinator.transaction.ProducerIdManager
4343
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
4444
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
45-
import org.apache.kafka.metadata.publisher.AclPublisher
45+
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
4646
import org.apache.kafka.security.CredentialProvider
4747
import org.apache.kafka.server.authorizer.Authorizer
4848
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -497,7 +497,7 @@ class BrokerServer(
497497
quotaManagers,
498498
),
499499
new ScramPublisher(
500-
config,
500+
config.nodeId,
501501
sharedServer.metadataPublishingFaultHandler,
502502
"broker",
503503
credentialProvider),

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
2222
import kafka.server.QuotaFactory.QuotaManagers
2323

2424
import scala.collection.immutable
25-
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
25+
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
2626
import kafka.utils.{CoreUtils, Logging}
2727
import org.apache.kafka.common.internals.Plugin
2828
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
3838
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
3939
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
4040
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
41-
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
41+
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
4242
import org.apache.kafka.raft.QuorumConfig
4343
import org.apache.kafka.security.CredentialProvider
4444
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
@@ -350,7 +350,7 @@ class ControllerServer(
350350

351351
// Set up the SCRAM publisher.
352352
metadataPublishers.add(new ScramPublisher(
353-
config,
353+
config.nodeId,
354354
sharedServer.metadataPublishingFaultHandler,
355355
"controller",
356356
credentialProvider

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3333
import org.apache.kafka.image.loader.LoaderManifest
3434
import org.apache.kafka.image.publisher.MetadataPublisher
3535
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
36-
import org.apache.kafka.metadata.publisher.AclPublisher
36+
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
3737
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
3838
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
3939
import org.apache.kafka.server.fault.FaultHandler

core/src/main/scala/kafka/server/metadata/ScramPublisher.scala

Lines changed: 0 additions & 71 deletions
This file was deleted.

core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
4040
import org.apache.kafka.coordinator.share.ShareCoordinator
4141
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
4242
import org.apache.kafka.image.loader.LogDeltaManifest
43-
import org.apache.kafka.metadata.publisher.AclPublisher
43+
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
4444
import org.apache.kafka.network.SocketServerConfigs
4545
import org.apache.kafka.raft.LeaderAndEpoch
4646
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.metadata.publisher;
18+
19+
import org.apache.kafka.image.MetadataDelta;
20+
import org.apache.kafka.image.MetadataImage;
21+
import org.apache.kafka.image.ScramDelta;
22+
import org.apache.kafka.image.loader.LoaderManifest;
23+
import org.apache.kafka.image.publisher.MetadataPublisher;
24+
import org.apache.kafka.security.CredentialProvider;
25+
import org.apache.kafka.server.fault.FaultHandler;
26+
27+
public class ScramPublisher implements MetadataPublisher {
28+
private final int nodeId;
29+
private final FaultHandler faultHandler;
30+
private final String nodeType;
31+
private final CredentialProvider credentialProvider;
32+
33+
public ScramPublisher(int nodeId, FaultHandler faultHandler, String nodeType, CredentialProvider credentialProvider) {
34+
this.nodeId = nodeId;
35+
this.faultHandler = faultHandler;
36+
this.nodeType = nodeType;
37+
this.credentialProvider = credentialProvider;
38+
}
39+
40+
@Override
41+
public final String name() {
42+
return "ScramPublisher " + nodeType + " id=" + nodeId;
43+
}
44+
45+
@Override
46+
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
47+
onMetadataUpdate(delta, newImage);
48+
}
49+
50+
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
51+
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
52+
try {
53+
// Apply changes to SCRAM credentials.
54+
ScramDelta scramDelta = delta.scramDelta();
55+
if (scramDelta != null) {
56+
scramDelta.changes().forEach((mechanism, userChanges) -> {
57+
userChanges.forEach((userName, change) -> {
58+
if (change.isPresent())
59+
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential());
60+
else
61+
credentialProvider.removeCredentials(mechanism, userName);
62+
});
63+
});
64+
}
65+
} catch (Throwable t) {
66+
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t);
67+
}
68+
}
69+
}
File renamed without changes.

0 commit comments

Comments
 (0)