Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
Expand Down Expand Up @@ -487,7 +487,7 @@ class BrokerServer(
dynamicConfigHandlers.toMap,
"broker"),
new DynamicClientQuotaPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
clientQuotaMetadataManager,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand All @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
Expand Down Expand Up @@ -334,7 +334,7 @@ class ControllerServer(
// Set up the client quotas publisher. This will enable controller mutation quotas and any
// other quotas which are applicable.
metadataPublishers.add(new DynamicClientQuotaPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
clientQuotaMetadataManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
Expand Down Expand Up @@ -218,7 +218,7 @@ class BrokerMetadataPublisher(
dynamicConfigPublisher.onMetadataUpdate(delta, newImage)

// Apply client quotas delta.
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)

// Apply topic or cluster quotas delta.
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Sanitizer
import java.net.{InetAddress, UnknownHostException}
import java.util.Optional
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.metadata.publisher.ClientQuotaUpdater
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.quota.ClientQuotaManager

Expand All @@ -51,7 +52,7 @@ case object DefaultUserDefaultClientIdEntity extends QuotaEntity
* Process quota metadata records as they appear in the metadata log and update quota managers and cache as necessary
*/
class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManagers,
private[metadata] val connectionQuotas: ConnectionQuotas) extends Logging {
private[metadata] val connectionQuotas: ConnectionQuotas) extends ClientQuotaUpdater with Logging {

def update(quotasDelta: ClientQuotasDelta): Unit = {
quotasDelta.changes().forEach { (key, value) =>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;

import org.apache.kafka.image.ClientQuotasDelta;

/**
* Interface for updating client quotas based on metadata changes.
*/
public interface ClientQuotaUpdater {
/**
* Update client quotas based on the given delta.
*
* @param quotasDelta The client quotas delta to apply
*/
void update(ClientQuotasDelta quotasDelta);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;

import org.apache.kafka.image.ClientQuotasDelta;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.fault.FaultHandler;

/**
* Publishes dynamic client quota changes to the client quota metadata manager.
*/
public class DynamicClientQuotaPublisher implements MetadataPublisher {
private final int nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final ClientQuotaUpdater clientQuotaUpdater;

public DynamicClientQuotaPublisher(
int nodeId,
FaultHandler faultHandler,
String nodeType,
ClientQuotaUpdater clientQuotaUpdater
) {
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.clientQuotaUpdater = clientQuotaUpdater;
}

@Override
public String name() {
return "DynamicClientQuotaPublisher " + nodeType + " id=" + nodeId;
}

@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
ClientQuotasDelta clientQuotasDelta = delta.clientQuotasDelta();
if (clientQuotasDelta != null) {
try {
clientQuotaUpdater.update(clientQuotasDelta);
} catch (Throwable t) {
faultHandler.handleFault("Uncaught exception while publishing dynamic client quota changes from " + deltaName, t);
}
}
}
}
Loading