From aa2bbe88ed1e1fa6fcec9eaaf623354963e0325f Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 6 Oct 2025 15:39:22 +0200 Subject: [PATCH 1/2] KAFKA-18710: Move DynamicClientQuotaPublisher to metadata module Signed-off-by: see-quick --- .../scala/kafka/server/BrokerServer.scala | 4 +- .../scala/kafka/server/ControllerServer.scala | 6 +- .../metadata/BrokerMetadataPublisher.scala | 4 +- .../metadata/ClientQuotaMetadataManager.scala | 3 +- .../DynamicClientQuotaPublisher.scala | 59 ----------------- .../BrokerMetadataPublisherTest.scala | 2 +- .../publisher/ClientQuotaUpdater.java | 31 +++++++++ .../DynamicClientQuotaPublisher.java | 65 +++++++++++++++++++ 8 files changed, 106 insertions(+), 68 deletions(-) delete mode 100644 core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a9217c4d0239b..9404c2b216707 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator} -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} @@ -487,7 +487,7 @@ class BrokerServer( dynamicConfigHandlers.toMap, "broker"), new DynamicClientQuotaPublisher( - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "broker", clientQuotaMetadataManager, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index e41705ed3bae9..4eb0de9f5ec58 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} +import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -333,8 +333,8 @@ class ControllerServer( // Set up the client quotas publisher. This will enable controller mutation quotas and any // other quotas which are applicable. - metadataPublishers.add(new DynamicClientQuotaPublisher( - config, + metadataPublishers.add(new org.apache.kafka.metadata.publisher.DynamicClientQuotaPublisher( + config.nodeId, sharedServer.metadataPublishingFaultHandler, "controller", clientQuotaMetadataManager diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 8df8a27558008..7364eef518817 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion} import org.apache.kafka.server.fault.FaultHandler @@ -218,7 +218,7 @@ class BrokerMetadataPublisher( dynamicConfigPublisher.onMetadataUpdate(delta, newImage) // Apply client quotas delta. - dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage) + dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest) // Apply topic or cluster quotas delta. dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage) diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala index cda7661907dd9..a4d81798c4008 100644 --- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala +++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Sanitizer import java.net.{InetAddress, UnknownHostException} import java.util.Optional import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta} +import org.apache.kafka.metadata.publisher.ClientQuotaUpdater import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.server.quota.ClientQuotaManager @@ -51,7 +52,7 @@ case object DefaultUserDefaultClientIdEntity extends QuotaEntity * Process quota metadata records as they appear in the metadata log and update quota managers and cache as necessary */ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManagers, - private[metadata] val connectionQuotas: ConnectionQuotas) extends Logging { + private[metadata] val connectionQuotas: ConnectionQuotas) extends ClientQuotaUpdater with Logging { def update(quotasDelta: ClientQuotasDelta): Unit = { quotasDelta.changes().forEach { (key, value) => diff --git a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala deleted file mode 100644 index 94aaebf00a652..0000000000000 --- a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server.metadata - -import kafka.server.KafkaConfig -import kafka.utils.Logging -import org.apache.kafka.image.loader.LoaderManifest -import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.server.fault.FaultHandler - - -class DynamicClientQuotaPublisher( - conf: KafkaConfig, - faultHandler: FaultHandler, - nodeType: String, - clientQuotaMetadataManager: ClientQuotaMetadataManager, -) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { - logIdent = s"[${name()}] " - - override def name(): String = s"DynamicClientQuotaPublisher $nodeType id=${conf.nodeId}" - - override def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - manifest: LoaderManifest - ): Unit = { - onMetadataUpdate(delta, newImage) - } - - def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - ): Unit = { - val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" - try { - Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => - clientQuotaMetadataManager.update(clientQuotasDelta) - } - } catch { - case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing dynamic client quota changes from $deltaName", t) - } - } -} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 32727a4c3cc7c..36a94edf9bd55 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage} import org.apache.kafka.image.loader.LogDeltaManifest -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java new file mode 100644 index 0000000000000..97e4f380db1b4 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.image.ClientQuotasDelta; + +/** + * Interface for updating client quotas based on metadata changes. + */ +public interface ClientQuotaUpdater { + /** + * Update client quotas based on the given delta. + * + * @param quotasDelta The client quotas delta to apply + */ + void update(ClientQuotasDelta quotasDelta); +} \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java new file mode 100644 index 0000000000000..910894acce163 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.ClientQuotasDelta; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.server.fault.FaultHandler; + +/** + * Publishes dynamic client quota changes to the client quota metadata manager. + */ +public class DynamicClientQuotaPublisher implements MetadataPublisher { + private final int nodeId; + private final FaultHandler faultHandler; + private final String nodeType; + private final ClientQuotaUpdater clientQuotaUpdater; + + public DynamicClientQuotaPublisher( + int nodeId, + FaultHandler faultHandler, + String nodeType, + ClientQuotaUpdater clientQuotaUpdater + ) { + this.nodeId = nodeId; + this.faultHandler = faultHandler; + this.nodeType = nodeType; + this.clientQuotaUpdater = clientQuotaUpdater; + } + + @Override + public String name() { + return "DynamicClientQuotaPublisher " + nodeType + " id=" + nodeId; + } + + @Override + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { + String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset(); + ClientQuotasDelta clientQuotasDelta = delta.clientQuotasDelta(); + if (clientQuotasDelta != null) { + try { + clientQuotaUpdater.update(clientQuotasDelta); + } catch (Throwable t) { + faultHandler.handleFault("Uncaught exception while publishing dynamic client quota changes from " + deltaName, t); + } + } + } +} \ No newline at end of file From 6a0a7659676eb39a18b4abbe1ba290f669d5b11a Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 6 Oct 2025 15:41:02 +0200 Subject: [PATCH 2/2] add eof and import fixes Signed-off-by: see-quick --- core/src/main/scala/kafka/server/ControllerServer.scala | 4 ++-- .../apache/kafka/metadata/publisher/ClientQuotaUpdater.java | 2 +- .../kafka/metadata/publisher/DynamicClientQuotaPublisher.java | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 4eb0de9f5ec58..f4802adaf8ff1 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager} @@ -333,7 +333,7 @@ class ControllerServer( // Set up the client quotas publisher. This will enable controller mutation quotas and any // other quotas which are applicable. - metadataPublishers.add(new org.apache.kafka.metadata.publisher.DynamicClientQuotaPublisher( + metadataPublishers.add(new DynamicClientQuotaPublisher( config.nodeId, sharedServer.metadataPublishingFaultHandler, "controller", diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java index 97e4f380db1b4..b8cf7ff073da5 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ClientQuotaUpdater.java @@ -28,4 +28,4 @@ public interface ClientQuotaUpdater { * @param quotasDelta The client quotas delta to apply */ void update(ClientQuotasDelta quotasDelta); -} \ No newline at end of file +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java index 910894acce163..a8213fad1a4b0 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicClientQuotaPublisher.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.metadata.publisher; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.image.ClientQuotasDelta; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -62,4 +61,4 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, Loader } } } -} \ No newline at end of file +}