diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh
index 2f9eb8c239f59..1c8943844972d 100755
--- a/bin/kafka-configs.sh
+++ b/bin/kafka-configs.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConfigCommand "$@"
diff --git a/bin/windows/kafka-configs.bat b/bin/windows/kafka-configs.bat
index 3792a5d9b7e17..09ee9ce8e87e9 100644
--- a/bin/windows/kafka-configs.bat
+++ b/bin/windows/kafka-configs.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
-"%~dp0kafka-run-class.bat" kafka.admin.ConfigCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConfigCommand %*
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 1c07ec17cd4f6..1b6b6ab0edd67 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -46,7 +46,6 @@
-
@@ -67,22 +66,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f8ff30c3eb92c..a9fb1cdc71d6a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -293,7 +293,6 @@
-
@@ -302,6 +301,7 @@
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 58e7723551575..53dbac88113d1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -235,11 +235,11 @@
+ files="(AclCommand|ConfigCommand|ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
+ files="(AclCommand|ConfigCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader|ConsoleProducer|DumpLogSegments).java"/>
alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
- "--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
- try (Admin client = clusterInstance.admin()) {
- ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(Set.of(alterOpts)));
-
- Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
- assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
- }
- }
-
@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
Map configs = new HashMap<>();
@@ -152,10 +134,6 @@ public void testMetrics(ClusterInstance clusterInstance) {
}
}
- private static String[] toArray(Collection> lists) {
- return lists.stream().flatMap(List::stream).toArray(String[]::new);
- }
-
@SuppressWarnings("unused")
public static class TelemetryExporter implements ClientTelemetryExporterProvider, MetricsReporter {
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
deleted file mode 100644
index 0166a64e67f61..0000000000000
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ /dev/null
@@ -1,755 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple._
-import kafka.utils.Implicits._
-import kafka.utils.Logging
-import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidConfigurationException, UnsupportedVersionException}
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.scram.internals.ScramMechanism
-import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.coordinator.group.GroupConfig
-import org.apache.kafka.server.config.{ConfigType, DynamicConfig, QuotaConfig}
-import org.apache.kafka.server.metrics.ClientMetricsConfigs
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.storage.internals.log.LogConfig
-
-import java.net.{InetAddress, UnknownHostException}
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.{ExecutionException, TimeUnit}
-import java.util.{Collections, Properties}
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-/**
- * This script can be used to change configs for topics/clients/users/brokers/ips/client-metrics/groups dynamically
- * An entity described or altered by the command may be one of:
- *
- * - topic: --topic OR --entity-type topics --entity-name
- *
- client: --client OR --entity-type clients --entity-name
- *
- user: --user OR --entity-type users --entity-name
- *
- : --user --client OR
- * --entity-type users --entity-name --entity-type clients --entity-name
- *
- broker: --broker OR --entity-type brokers --entity-name
- *
- broker-logger: --broker-logger OR --entity-type broker-loggers --entity-name
- *
- ip: --ip OR --entity-type ips --entity-name
- *
- client-metrics: --client-metrics OR --entity-type client-metrics --entity-name
- *
- group: --group OR --entity-type groups --entity-name
- *
- * --entity-type --entity-default may be specified in place of --entity-type --entity-name
- * when describing or altering default configuration for users, clients, brokers, or ips, respectively.
- * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or --ip-defaults may be specified in place of
- * --entity-type --entity-default, respectively.
- */
-object ConfigCommand extends Logging {
-
- private val BrokerDefaultEntityName = ""
- val BrokerLoggerConfigType = "broker-loggers"
- private val BrokerSupportedConfigTypes = ConfigType.values.map(_.value) :+ BrokerLoggerConfigType
- private val DefaultScramIterations = 4096
- private val TopicType = ConfigType.TOPIC.value
- private val ClientMetricsType = ConfigType.CLIENT_METRICS.value
- private val BrokerType = ConfigType.BROKER.value
- private val GroupType = ConfigType.GROUP.value
- private val UserType = ConfigType.USER.value
- private val ClientType = ConfigType.CLIENT.value
- private val IpType = ConfigType.IP.value
-
- def main(args: Array[String]): Unit = {
- try {
- val opts = new ConfigCommandOptions(args)
-
- CommandLineUtils.maybePrintHelpOrVersion(opts,
- "This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip, client-metrics or group")
-
- opts.checkArgs()
- processCommand(opts)
- } catch {
- case e: UnsupportedVersionException =>
- logger.debug(s"Unsupported API encountered in server when executing config command with args '${args.mkString(" ")}'")
- System.err.println(e.getMessage)
- Exit.exit(1)
-
- case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) =>
- logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e)
- System.err.println(e.getMessage)
- Exit.exit(1)
-
- case t: Throwable =>
- logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t)
- System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
- t.printStackTrace(System.err)
- Exit.exit(1)
- }
- }
-
- def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
- val props = new Properties
- if (opts.options.has(opts.addConfigFile)) {
- val file = opts.options.valueOf(opts.addConfigFile)
- props ++= Utils.loadProps(file)
- }
- if (opts.options.has(opts.addConfig)) {
- // Split list by commas, but avoid those in [], then into KV pairs
- // Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to
- // include trailing empty strings. This is to support empty value (e.g. 'ssl.endpoint.identification.algorithm=')
- val pattern = "(?=[^\\]]*(?:\\[|$))"
- val configsToBeAdded = opts.options.valueOf(opts.addConfig)
- .split("," + pattern)
- .map(_.split("""\s*=\s*""" + pattern, -1))
- require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\" or \"key=[val1,val2]\" to group values which contain commas.")
- //Create properties, parsing square brackets from values if necessary
- configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
- }
- validatePropsKey(props)
- props
- }
-
- def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
- if (opts.options.has(opts.deleteConfig)) {
- val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).asScala.map(_.trim())
- configsToBeDeleted
- }
- else
- Seq.empty
- }
-
- private def validatePropsKey(props: Properties): Unit = {
- props.keySet.forEach { propsKey =>
- // Allows the '$' symbol to support valid logger names for internal classes (e.g. org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper)
- if (!propsKey.toString.matches("[$a-zA-Z0-9._-]*")) {
- throw new IllegalArgumentException(
- s"Invalid character found for config key: $propsKey"
- )
- }
- }
- }
-
- private def processCommand(opts: ConfigCommandOptions): Unit = {
- val props = if (opts.options.has(opts.commandConfigOpt))
- Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
- else
- new Properties()
- CommandLineUtils.initializeBootstrapProperties(opts.parser,
- opts.options,
- props,
- opts.bootstrapServerOpt,
- opts.bootstrapControllerOpt)
- val adminClient = Admin.create(props)
-
- if (opts.options.has(opts.alterOpt) && opts.entityTypes.size != opts.entityNames.size)
- throw new IllegalArgumentException(s"An entity name must be specified for every entity type")
-
- try {
- if (opts.options.has(opts.alterOpt))
- alterConfig(adminClient, opts)
- else if (opts.options.has(opts.describeOpt))
- describeConfig(adminClient, opts)
- } finally {
- adminClient.close()
- }
- }
-
- def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
- val entityTypes = opts.entityTypes
- val entityNames = opts.entityNames
- val entityTypeHead = entityTypes.head
- val entityNameHead = entityNames.head
- val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability
- val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
- val configsToBeDeleted = parseConfigsToBeDeleted(opts)
-
- entityTypeHead match {
- case TopicType | ClientMetricsType | BrokerType | GroupType =>
- val configResourceType = entityTypeHead match {
- case TopicType => ConfigResource.Type.TOPIC
- case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
- case BrokerType => ConfigResource.Type.BROKER
- case GroupType => ConfigResource.Type.GROUP
- case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
- }
- try {
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
- } catch {
- case e: ExecutionException =>
- e.getCause match {
- case _: UnsupportedVersionException =>
- throw new UnsupportedVersionException(s"The ${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The API is supported starting from version 2.3.0."
- + " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.")
- case _ => throw e
- }
- case e: Throwable => throw e
- }
-
- case BrokerLoggerConfigType =>
- val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
- // fail the command if any of the configured broker loggers do not exist
- val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
- if (invalidBrokerLoggers.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
- val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
- val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
- val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
-
- case UserType | ClientType =>
- val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
- val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism.isScram(entry._1))
- val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
- val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig.isClientOrUserQuotaConfig)
- val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism.isScram)
- val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
- if (entityTypeHead == ClientType || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
- // either just a client or both a user and a client
- if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be added for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
- if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be deleted for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
- } else { // ConfigType.User
- if (unknownConfigsToAdd.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
- if (unknownConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
- if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
- if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
- throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.")
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
- throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '$UserType' using --bootstrap-server.")
- }
- }
-
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
- alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
- } else {
- // handle altering user SCRAM credential configs
- if (entityNames.size != 1)
- // should never happen, if we get here then it is a bug
- throw new IllegalStateException(s"Altering user SCRAM credentials should never occur for more zero or multiple users: $entityNames")
- alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
- }
-
- case IpType =>
- val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => QuotaConfig.ipConfigs.names.contains(key))
- if (unknownConfigs.nonEmpty)
- throw new IllegalArgumentException(s"Only connection quota configs can be added for '$IpType' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
- alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
-
- case _ =>
- throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
- }
-
- if (entityNameHead.nonEmpty)
- System.out.println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
- else
- System.out.println(s"Completed updating default config for $entityTypeHead in the cluster.")
- }
-
- private def alterUserScramCredentialConfigs(adminClient: Admin, user: String, scramConfigsToAddMap: Map[String, ConfigEntry], scramConfigsToDelete: Seq[String]) = {
- val deletions = scramConfigsToDelete.map(mechanismName =>
- new UserScramCredentialDeletion(user, PublicScramMechanism.fromMechanismName(mechanismName)))
-
- def iterationsAndPasswordBytes(mechanism: ScramMechanism, credentialStr: String): (Integer, Array[Byte]) = {
- val pattern = "(?:iterations=(\\-?[0-9]*),)?password=(.*)".r
- val (iterations, password) = credentialStr match {
- case pattern(iterations, password) => (if (iterations != null && iterations != "-1") iterations.toInt else DefaultScramIterations, password)
- case _ => throw new IllegalArgumentException(s"Invalid credential property $mechanism=$credentialStr")
- }
- if (iterations < mechanism.minIterations)
- throw new IllegalArgumentException(s"Iterations $iterations is less than the minimum ${mechanism.minIterations} required for ${mechanism.mechanismName}")
- (iterations, password.getBytes(StandardCharsets.UTF_8))
- }
-
- val upsertions = scramConfigsToAddMap.map { case (mechanismName, configEntry) =>
- val (iterations, passwordBytes) = iterationsAndPasswordBytes(ScramMechanism.forMechanismName(mechanismName), configEntry.value)
- new UserScramCredentialUpsertion(user, new ScramCredentialInfo(PublicScramMechanism.fromMechanismName(mechanismName), iterations), passwordBytes)
- }
- // we are altering only a single user by definition, so we don't have to worry about one user succeeding and another
- // failing; therefore just check the success of all the futures (since there will only be 1)
- adminClient.alterUserScramCredentials((deletions ++ upsertions).toList.asJava).all.get(60, TimeUnit.SECONDS)
- }
-
- private def alterQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = {
- // handle altering client/user quota configs
- val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
-
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
-
- val alterEntityTypes = entityTypes.map {
- case UserType => ClientQuotaEntity.USER
- case ClientType => ClientQuotaEntity.CLIENT_ID
- case IpType => ClientQuotaEntity.IP
- case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType")
- }
- val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
-
- // Explicitly populate a HashMap to ensure nulls are recorded properly.
- val alterEntityMap = new java.util.HashMap[String, String]
- alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
- val entity = new ClientQuotaEntity(alterEntityMap)
-
- val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
- val alterOps = (configsToBeAddedMap.map { case (key, value) =>
- val doubleValue = try value.toDouble catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"Cannot parse quota configuration value for $key: $value")
- }
- new ClientQuotaAlteration.Op(key, doubleValue)
- } ++ configsToBeDeleted.map(key => new ClientQuotaAlteration.Op(key, null))).asJavaCollection
-
- adminClient.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(entity, alterOps)), alterOptions)
- .all().get(60, TimeUnit.SECONDS)
- }
-
- def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
- val entityTypes = opts.entityTypes
- val entityNames = opts.entityNames
- val describeAll = opts.options.has(opts.allOpt)
-
- entityTypes.head match {
- case TopicType | BrokerType | BrokerLoggerConfigType | ClientMetricsType | GroupType =>
- describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
- case UserType | ClientType =>
- describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
- case IpType =>
- describeQuotaConfigs(adminClient, entityTypes, entityNames)
- case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
- }
- }
-
- private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = {
- if (!describeAll) {
- entityName.foreach { name =>
- entityType match {
- case TopicType =>
- Topic.validate(name)
- if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) {
- System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
- return
- }
- case BrokerType | BrokerLoggerConfigType =>
- if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) {
- // valid broker id
- } else if (name == BrokerDefaultEntityName) {
- // default broker configs
- } else {
- System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
- return
- }
- case ClientMetricsType =>
- if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get
- .stream.noneMatch(_.name == name)) {
- System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
- return
- }
- case GroupType =>
- if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) {
- System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
- return
- }
- case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
- }
- }
- }
-
- val entities = entityName
- .map(name => List(name))
- .getOrElse(entityType match {
- case TopicType =>
- adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
- case BrokerType | BrokerLoggerConfigType =>
- adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
- case ClientMetricsType =>
- adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
- case GroupType =>
- adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set.empty)
- case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
- })
-
- entities.foreach { entity =>
- entity match {
- case BrokerDefaultEntityName =>
- System.out.println(s"Default configs for $entityType in the cluster are:")
- case _ =>
- val configSourceStr = if (describeAll) "All" else "Dynamic"
- System.out.println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:")
- }
- getResourceConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
- val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
- System.out.println(s" ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}")
- }
- }
- }
-
- private def alterResourceConfig(adminClient: Admin, entityTypeHead: String, entityNameHead: String, configsToBeDeleted: Seq[String], configsToBeAdded: Map[String, ConfigEntry], resourceType: ConfigResource.Type): Unit = {
- val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
-
- val configResource = new ConfigResource(resourceType, entityNameHead)
- val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
- val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
- val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
- }
-
- private def getResourceConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
- def validateBrokerId(): Unit = try entityName.toInt catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
- }
-
- val (configResourceType, dynamicConfigSource) = entityType match {
- case TopicType =>
- if (entityName.nonEmpty)
- Topic.validate(entityName)
- (ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
- case BrokerType => entityName match {
- case BrokerDefaultEntityName =>
- (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
- case _ =>
- validateBrokerId()
- (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG))
- }
- case BrokerLoggerConfigType =>
- if (entityName.nonEmpty)
- validateBrokerId()
- (ConfigResource.Type.BROKER_LOGGER, None)
- case ClientMetricsType =>
- (ConfigResource.Type.CLIENT_METRICS, Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))
- case GroupType =>
- (ConfigResource.Type.GROUP, Some(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG))
- case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
- }
-
- val configSourceFilter = if (describeAll)
- None
- else
- dynamicConfigSource
-
- val configResource = new ConfigResource(configResourceType, entityName)
- val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
- val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
- .all.get(30, TimeUnit.SECONDS)
- configs.get(configResource).entries.asScala
- .filter(entry => configSourceFilter match {
- case Some(configSource) => entry.source == configSource
- case None => true
- }).toSeq.sortBy(entry => entry.name())
- }
-
- private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = {
- val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
- quotaConfigs.foreachEntry { (entity, entries) =>
- val entityEntries = entity.entries.asScala
-
- def entitySubstr(entityType: String): Option[String] =
- entityEntries.get(entityType).map { name =>
- val typeStr = entityType match {
- case ClientQuotaEntity.USER => "user-principal"
- case ClientQuotaEntity.CLIENT_ID => "client-id"
- case ClientQuotaEntity.IP => "ip"
- }
- if (name != null) s"$typeStr '$name'"
- else s"the default $typeStr"
- }
-
- val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++
- entitySubstr(ClientQuotaEntity.CLIENT_ID) ++
- entitySubstr(ClientQuotaEntity.IP)).mkString(", ")
- val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ")
- System.out.println(s"Quota configs for $entityStr are $entriesStr")
- }
- }
-
- private def describeClientQuotaAndUserScramCredentialConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = {
- describeQuotaConfigs(adminClient, entityTypes, entityNames)
- // we describe user SCRAM credentials only when we are not describing client information
- // and we are not given either --entity-default or --user-defaults
- if (!entityTypes.contains(ClientType) && !entityNames.contains("")) {
- val result = adminClient.describeUserScramCredentials(entityNames.asJava)
- result.users.get(30, TimeUnit.SECONDS).asScala.foreach(user => {
- try {
- val description = result.description(user).get(30, TimeUnit.SECONDS)
- val descriptionText = description.credentialInfos.asScala.map(info => s"${info.mechanism.mechanismName}=iterations=${info.iterations}").mkString(", ")
- System.out.println(s"SCRAM credential configs for user-principal '$user' are $descriptionText")
- } catch {
- case e: Exception => System.out.println(s"Error retrieving SCRAM credential configs for user-principal '$user': ${e.getClass.getSimpleName}: ${e.getMessage}")
- }
- })
- }
- }
-
- private def getClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Map[String, java.lang.Double] = {
- if (entityTypes.size != entityNames.size)
- throw new IllegalArgumentException("Exactly one entity name must be specified for every entity type")
- getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).headOption.map(_._2.asScala).getOrElse(Map.empty)
- }
-
- private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
- val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
- val entityType = entityTypeOpt match {
- case Some(UserType) => ClientQuotaEntity.USER
- case Some(ClientType) => ClientQuotaEntity.CLIENT_ID
- case Some(IpType) => ClientQuotaEntity.IP
- case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
- case None => throw new IllegalArgumentException("More entity names specified than entity types")
- }
- entityNameOpt match {
- case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
- case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name)
- case None => ClientQuotaFilterComponent.ofEntityType(entityType)
- }
- }
-
- adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
- }
-
- private def listGroupConfigResources(adminClient: Admin): Option[java.util.Collection[ConfigResource]] = {
- try {
- Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get)
- } catch {
- // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException and ClusterAuthorizationException as None
- case e: ExecutionException if e.getCause.isInstanceOf[UnsupportedVersionException] => None
- case e: ExecutionException if e.getCause.isInstanceOf[ClusterAuthorizationException] => None
- case e: ExecutionException => throw e.getCause
- }
- }
-
-
- class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
- val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
- val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.")
- .withRequiredArg
- .describedAs("controller to connect to")
- .ofType(classOf[String])
- val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
- "This is used only with --bootstrap-server option for describing and altering broker configs.")
- .withRequiredArg
- .describedAs("command config property file")
- .ofType(classOf[String])
- val alterOpt: OptionSpecBuilder = parser.accepts("alter", "Alter the configuration for the entity.")
- val describeOpt: OptionSpecBuilder = parser.accepts("describe", "List configs for the given entity.")
- val allOpt: OptionSpecBuilder = parser.accepts("all", "List all configs for the given entity, including static configs if available.")
-
- val entityType: OptionSpec[String] = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers/ips/client-metrics/groups)")
- .withRequiredArg
- .ofType(classOf[String])
- val entityName: OptionSpec[String] = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id/ip/client metrics/group id)")
- .withRequiredArg
- .ofType(classOf[String])
- private val entityDefault: OptionSpecBuilder = parser.accepts("entity-default", "Default entity name for clients/users/brokers/ips (applies to corresponding entity type)")
-
- private val nl: String = System.lineSeparator()
- val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
- "For entity-type '" + TopicType + "': " + LogConfig.nonInternalConfigNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + BrokerType + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + UserType + "': " + QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + IpType + "': " + QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + ClientMetricsType + "': " + ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- "For entity-type '" + GroupType + "': " + GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
- s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.")
- .withRequiredArg
- .ofType(classOf[String])
- val addConfigFile: OptionSpec[String] = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
- .withRequiredArg
- .ofType(classOf[String])
- val deleteConfig: OptionSpec[String] = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
- .withRequiredArg
- .ofType(classOf[String])
- .withValuesSeparatedBy(',')
- val topic: OptionSpec[String] = parser.accepts("topic", "The topic's name.")
- .withRequiredArg
- .ofType(classOf[String])
- val client: OptionSpec[String] = parser.accepts("client", "The client's ID.")
- .withRequiredArg
- .ofType(classOf[String])
- private val clientDefaults = parser.accepts("client-defaults", "The config defaults for all clients.")
- val user: OptionSpec[String] = parser.accepts("user", "The user's principal name.")
- .withRequiredArg
- .ofType(classOf[String])
- private val userDefaults = parser.accepts("user-defaults", "The config defaults for all users.")
- val broker: OptionSpec[String] = parser.accepts("broker", "The broker's ID.")
- .withRequiredArg
- .ofType(classOf[String])
- private val brokerDefaults = parser.accepts("broker-defaults", "The config defaults for all brokers.")
- private val brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.")
- .withRequiredArg
- .ofType(classOf[String])
- private val ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.")
- val ip: OptionSpec[String] = parser.accepts("ip", "The IP address.")
- .withRequiredArg
- .ofType(classOf[String])
- val group: OptionSpec[String] = parser.accepts("group", "The group's ID.")
- .withRequiredArg
- .ofType(classOf[String])
- val clientMetrics: OptionSpec[String] = parser.accepts("client-metrics", "The client metrics config resource name.")
- .withRequiredArg
- .ofType(classOf[String])
- options = parser.parse(args : _*)
-
- private val entityFlags = List((topic, TopicType),
- (client, ClientType),
- (user, UserType),
- (broker, BrokerType),
- (brokerLogger, BrokerLoggerConfigType),
- (ip, IpType),
- (clientMetrics, ClientMetricsType),
- (group, GroupType))
-
- private val entityDefaultsFlags = List((clientDefaults, ClientType),
- (userDefaults, UserType),
- (brokerDefaults, BrokerType),
- (ipDefaults, IpType))
-
- private[admin] def entityTypes: List[String] = {
- options.valuesOf(entityType).asScala.toList ++
- (entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2)
- }
-
- private[admin] def entityNames: List[String] = {
- val namesIterator = options.valuesOf(entityName).iterator
- options.specs.asScala
- .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
- .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++
- entityFlags
- .filter(entity => options.has(entity._1))
- .map(entity => options.valueOf(entity._1)) ++
- entityDefaultsFlags
- .filter(entity => options.has(entity._1))
- .map(_ => "")
- }
-
- def checkArgs(): Unit = {
- // should have exactly one action
- val actions = Seq(alterOpt, describeOpt).count(options.has _)
- if (actions != 1)
- CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --describe, --alter")
- // check required args
- CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, alterOpt, addConfig, deleteConfig)
-
- val entityTypeVals = entityTypes
- if (entityTypeVals.size != entityTypeVals.distinct.size)
- throw new IllegalArgumentException(s"Duplicate entity type(s) specified: ${entityTypeVals.diff(entityTypeVals.distinct).mkString(",")}")
-
- val (allowedEntityTypes, connectOptString) =
- if (options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt)) {
- (BrokerSupportedConfigTypes, "--bootstrap-server or --bootstrap-controller")
- } else {
- throw new IllegalArgumentException("Either --bootstrap-server or --bootstrap-controller must be specified.")
- }
-
- entityTypeVals.foreach(entityTypeVal =>
- if (!allowedEntityTypes.contains(entityTypeVal))
- throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with a $connectOptString argument")
- )
- if (entityTypeVals.isEmpty)
- throw new IllegalArgumentException("At least one entity type must be specified")
- else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(UserType, ClientType)))
- throw new IllegalArgumentException(s"Only '$UserType' and '$ClientType' entity types may be specified together")
-
- if ((options.has(entityName) || options.has(entityType) || options.has(entityDefault)) &&
- (entityFlags ++ entityDefaultsFlags).exists(entity => options.has(entity._1)))
- throw new IllegalArgumentException("--entity-{type,name,default} should not be used in conjunction with specific entity flags")
-
- val hasEntityName = entityNames.exists(_.nonEmpty)
- val hasEntityDefault = entityNames.exists(_.isEmpty)
-
- val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) +
- (if (options.has(bootstrapControllerOpt)) 1 else 0)
- if (numConnectOptions > 1)
- throw new IllegalArgumentException("Only one of --bootstrap-server or --bootstrap-controller can be specified")
- if (hasEntityName && (entityTypeVals.contains(BrokerType) || entityTypeVals.contains(BrokerLoggerConfigType))) {
- Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
- try brokerId.toInt catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid integer broker id, but it is: $brokerId")
- }
- }
- }
-
- if (hasEntityName && entityTypeVals.contains(IpType)) {
- Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
- if (!isValidIpEntity(ipEntity))
- throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity")
- }
- }
-
- if (options.has(describeOpt)) {
- if (!(entityTypeVals.contains(UserType) ||
- entityTypeVals.contains(ClientType) ||
- entityTypeVals.contains(BrokerType) ||
- entityTypeVals.contains(IpType)) && options.has(entityDefault)) {
- throw new IllegalArgumentException(s"--entity-default must not be specified with --describe of ${entityTypeVals.mkString(",")}")
- }
-
- if (entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
- throw new IllegalArgumentException(s"An entity name must be specified with --describe of ${entityTypeVals.mkString(",")}")
- }
-
- if (options.has(alterOpt)) {
- if (entityTypeVals.contains(UserType) ||
- entityTypeVals.contains(ClientType) ||
- entityTypeVals.contains(BrokerType) ||
- entityTypeVals.contains(IpType)) {
- if (!hasEntityName && !hasEntityDefault)
- throw new IllegalArgumentException("An entity-name or default entity must be specified with --alter of users, clients, brokers or ips")
- } else if (!hasEntityName)
- throw new IllegalArgumentException(s"An entity name must be specified with --alter of ${entityTypeVals.mkString(",")}")
-
- val isAddConfigPresent = options.has(addConfig)
- val isAddConfigFilePresent = options.has(addConfigFile)
- val isDeleteConfigPresent = options.has(deleteConfig)
-
- if (isAddConfigPresent && isAddConfigFilePresent)
- throw new IllegalArgumentException("Only one of --add-config or --add-config-file must be specified")
-
- if (!isAddConfigPresent && !isAddConfigFilePresent && !isDeleteConfigPresent)
- throw new IllegalArgumentException("At least one of --add-config, --add-config-file, or --delete-config must be specified with --alter")
- }
- }
- }
-
- def isValidIpEntity(ip: String): Boolean = {
- try {
- InetAddress.getByName(ip)
- } catch {
- case _: UnknownHostException => return false
- }
- true
- }
-}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a9f3fa1c9e2d0..a99561508ff7c 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -28,7 +28,6 @@ import java.util.{Optional, Properties}
import java.util.concurrent._
import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
-import kafka.admin.ConfigCommand
import kafka.api.SaslSetup
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.security.JaasTestUtils
@@ -237,7 +236,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
val adminClient = adminClients.head
- alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
+ alterSslKeystore(sslProperties1, SecureExternal)
val configDesc = TestUtils.tryUntilNoAssertionError() {
val describeConfigsResult = describeConfig(adminClient)
@@ -314,7 +313,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// 3. update password property using config provider
updatedProps.put(configPrefix + SSL_KEYSTORE_PASSWORD_CONFIG, SslKeystorePasswordVal)
- alterConfigsUsingConfigCommand(updatedProps)
+ alterConfigs(servers, adminClients.head, updatedProps, perBrokerConfig = true)
waitForConfig(TestMetricsReporter.PollingIntervalProp, "1000")
waitForConfig(configPrefix + SSL_TRUSTSTORE_TYPE_CONFIG, "JKS")
waitForConfig(configPrefix + SSL_KEYSTORE_PASSWORD_CONFIG, "ServerPassword")
@@ -328,7 +327,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// verify the update
// 1. verify update not occurring if the value of property is same.
- alterConfigsUsingConfigCommand(updatedProps)
+ alterConfigs(servers, adminClients.head, updatedProps, perBrokerConfig = true)
waitForConfig(TestMetricsReporter.PollingIntervalProp, "1000")
reporters.foreach { reporter =>
reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000)
@@ -336,7 +335,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// 2. verify update occurring if the value of property changed.
updatedProps.put(TestMetricsReporter.PollingIntervalProp, PollingIntervalUpdateVal)
- alterConfigsUsingConfigCommand(updatedProps)
+ alterConfigs(servers, adminClients.head, updatedProps, perBrokerConfig = true)
waitForConfig(TestMetricsReporter.PollingIntervalProp, "2000")
reporters.foreach { reporter =>
reporter.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval = 2000)
@@ -359,7 +358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyAuthenticationFailure(producer1)
// Update broker keystore for external listener
- alterSslKeystoreUsingConfigCommand(sslProperties2, SecureExternal)
+ alterSslKeystore(sslProperties2, SecureExternal)
// New producer with old truststore should fail to connect
val producer2 = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build()
@@ -1081,7 +1080,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val configPrefix = listenerPrefix(SecureExternal)
val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS, configPrefix)
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
- alterConfigsUsingConfigCommand(updatedProps)
+ alterConfigs(servers, adminClients.head, updatedProps, perBrokerConfig = true)
verifyConfiguration(false)
// Ensure it remains off after shutdown.
@@ -1093,7 +1092,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// Turn verification back on.
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
- alterConfigsUsingConfigCommand(updatedProps)
+ alterConfigs(servers, adminClients.head, updatedProps, perBrokerConfig = true)
verifyConfiguration(true)
}
@@ -1112,7 +1111,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
servers += newBroker
- alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal))
+ alterSslKeystore(sslProperties1, listenerPrefix(SecureExternal))
// Add num.replica.fetchers to the cluster-level config.
val clusterLevelProps = new Properties
@@ -1254,15 +1253,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure)
}
- private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = {
- val configPrefix = listenerPrefix(listener)
- val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix)
- alterConfigsUsingConfigCommand(newProps)
- waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
- }
-
private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = {
-val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), OpType.SET) }.toList.asJava
+ val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), OpType.SET) }.toList.asJava
val alterConfigs = new java.util.HashMap[ConfigResource, java.util.Collection[AlterConfigOp]]()
alterConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString), configEntries)
adminClients.head.incrementalAlterConfigs(alterConfigs)
@@ -1388,21 +1380,6 @@ val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new Con
}
}
- private def alterConfigsUsingConfigCommand(props: Properties): Unit = {
- val propsFile = tempPropertiesFile(clientProps(SecurityProtocol.SSL))
-
- servers.foreach { server =>
- val args = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, new ListenerName(SecureInternal)),
- "--command-config", propsFile.getAbsolutePath,
- "--alter", "--add-config", props.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
- "--entity-type", "brokers",
- "--entity-name", server.config.brokerId.toString)
- ConfigCommand.main(args)
- }
- }
-
- private def tempPropertiesFile(properties: Properties): File = TestUtils.tempPropertiesFile(properties.asScala)
-
private abstract class ClientBuilder[T] {
protected var _bootstrapServers: Option[String] = None
protected var _listenerName: String = SecureExternal
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
new file mode 100644
index 0000000000000..22780c32d5814
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
@@ -0,0 +1,1057 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.clients.admin.DescribeConfigsOptions;
+import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ScramCredentialInfo;
+import org.apache.kafka.clients.admin.UserScramCredentialAlteration;
+import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
+import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
+import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.server.config.ConfigType;
+import org.apache.kafka.server.config.DynamicConfig;
+import org.apache.kafka.server.config.QuotaConfig;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
+/**
+ * This script can be used to change configs for topics/clients/users/brokers/ips/client-metrics/groups dynamically
+ * An entity described or altered by the command may be one of:
+ *
+ * - topic: --topic OR --entity-type topics --entity-name
+ *
- client: --client OR --entity-type clients --entity-name
+ *
- user: --user OR --entity-type users --entity-name
+ *
- : --user --client OR
+ * --entity-type users --entity-name --entity-type clients --entity-name
+ *
- broker: --broker OR --entity-type brokers --entity-name
+ *
- broker-logger: --broker-logger OR --entity-type broker-loggers --entity-name
+ *
- ip: --ip OR --entity-type ips --entity-name
+ *
- client-metrics: --client-metrics OR --entity-type client-metrics --entity-name
+ *
- group: --group OR --entity-type groups --entity-name
+ *
+ * --entity-type --entity-default may be specified in place of --entity-type --entity-name
+ * when describing or altering default configuration for users, clients, brokers, or ips, respectively.
+ * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or --ip-defaults may be specified in place of
+ * --entity-type --entity-default, respectively.
+ */
+public class ConfigCommand {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigCommand.class);
+
+ private static final String BROKER_DEFAULT_ENTITY_NAME = "";
+ private static final List BROKER_SUPPORTED_CONFIG_TYPES;
+ private static final int DEFAULT_SCRAM_ITERATIONS = 4096;
+ private static final String TOPIC_TYPE = ConfigType.TOPIC.value();
+ private static final String CLIENT_METRICS_TYPE = ConfigType.CLIENT_METRICS.value();
+ private static final String BROKER_TYPE = ConfigType.BROKER.value();
+ private static final String GROUP_TYPE = ConfigType.GROUP.value();
+ private static final String USER_TYPE = ConfigType.USER.value();
+ private static final String CLIENT_TYPE = ConfigType.CLIENT.value();
+ private static final String IP_TYPE = ConfigType.IP.value();
+
+ static final String BROKER_LOGGER_CONFIG_TYPE = "broker-loggers";
+ static {
+ BROKER_SUPPORTED_CONFIG_TYPES = new ArrayList<>();
+ BROKER_SUPPORTED_CONFIG_TYPES.add(BROKER_LOGGER_CONFIG_TYPE);
+ for (ConfigType configType : ConfigType.values()) {
+ BROKER_SUPPORTED_CONFIG_TYPES.add(configType.value());
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ ConfigCommandOptions opts = new ConfigCommandOptions(args);
+ CommandLineUtils.maybePrintHelpOrVersion(opts,
+ "This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip, client-metrics or group");
+ opts.checkArgs();
+ processCommand(opts);
+ } catch (UnsupportedVersionException uve) {
+ LOG.debug("Unsupported API encountered in server when executing config command with args '{}'", String.join(" ", args));
+ System.err.println(uve.getMessage());
+ Exit.exit(1);
+ } catch (IllegalArgumentException | InvalidConfigurationException | OptionException e) {
+ LOG.debug("Failed config command with args '{}'", String.join(" ", args), e);
+ System.err.println(e.getMessage());
+ Exit.exit(1);
+ } catch (Throwable t) {
+ LOG.debug("Error while executing config command with args '{}'", String.join(" ", args), t);
+ System.err.println("Error while executing config command with args '" + String.join(" ", args) + "'");
+ t.printStackTrace(System.err);
+ Exit.exit(1);
+ }
+ }
+
+ static Properties parseConfigsToBeAdded(ConfigCommandOptions opts) throws IOException {
+ Properties props = new Properties();
+ if (opts.options.has(opts.addConfigFile)) {
+ String file = opts.options.valueOf(opts.addConfigFile);
+ props.putAll(Utils.loadProps(file));
+ }
+ if (opts.options.has(opts.addConfig)) {
+ // Split list by commas, but avoid those in [], then into KV pairs
+ // Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to
+ // include trailing empty strings. This is to support empty value (e.g. 'ssl.endpoint.identification.algorithm=')
+ String pattern = "(?=[^\\]]*(?:\\[|$))";
+ String[][] configsToBeAdded = Stream.of(opts.options.valueOf(opts.addConfig).split("," + pattern))
+ .map(s -> s.split("\\s*=\\s*" + pattern, -1))
+ .toArray(String[][]::new);
+
+ if (Stream.of(configsToBeAdded).anyMatch(config -> config.length != 2)) {
+ throw new IllegalArgumentException("Invalid entity config: all configs to be added must be in the format \"key=val\" or \"key=[val1,val2]\" to group values which contain commas.");
+ }
+
+ //Create properties, parsing square brackets from values if necessary
+ Stream.of(configsToBeAdded).forEach(pair ->
+ props.setProperty(pair[0].trim(), pair[1].replaceAll("\\[?\\]?", "").trim())
+ );
+ }
+ validatePropsKey(props);
+ return props;
+ }
+
+ static List parseConfigsToBeDeleted(ConfigCommandOptions opts) {
+ if (opts.options.has(opts.deleteConfig)) {
+ return opts.options.valuesOf(opts.deleteConfig).stream().map(String::trim).toList();
+ } else {
+ return List.of();
+ }
+ }
+
+ private static void validatePropsKey(Properties props) {
+ props.keySet().forEach(propsKey -> {
+ // Allows the '$' symbol to support valid logger names for internal classes (e.g. org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper)
+ if (!propsKey.toString().matches("[$a-zA-Z0-9._-]*")) {
+ throw new IllegalArgumentException("Invalid character found for config key: " + propsKey);
+ }
+ });
+ }
+
+ private static void processCommand(ConfigCommandOptions opts) throws Exception {
+ Properties props = opts.options.has(opts.commandConfigOpt)
+ ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ : new Properties();
+ CommandLineUtils.initializeBootstrapProperties(opts.parser,
+ opts.options,
+ props,
+ opts.bootstrapServerOpt,
+ opts.bootstrapControllerOpt);
+
+ if (opts.options.has(opts.alterOpt) && opts.entityTypes().size() != opts.entityNames().size()) {
+ throw new IllegalArgumentException("An entity name must be specified for every entity type");
+ }
+
+ try (Admin adminClient = Admin.create(props)) {
+ if (opts.options.has(opts.alterOpt)) {
+ alterConfig(adminClient, opts);
+ } else if (opts.options.has(opts.describeOpt)) {
+ describeConfig(adminClient, opts);
+ }
+ }
+ }
+
+ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TimeoutException {
+ List entityTypes = opts.entityTypes();
+ List entityNames = opts.entityNames();
+ String entityType = entityTypes.get(0);
+ String entityName = entityNames.get(0);
+ Properties configsToBeAddedProps = parseConfigsToBeAdded(opts);
+ Map configsToBeAddedMap = configsToBeAddedProps.entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> e.getKey().toString(),
+ e -> e.getValue().toString()
+ ));
+ Map configsToBeAdded = configsToBeAddedMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new ConfigEntry(e.getKey(), e.getValue())
+ ));
+ List configsToBeDeleted = parseConfigsToBeDeleted(opts);
+
+ if (TOPIC_TYPE.equals(entityType) || CLIENT_METRICS_TYPE.equals(entityType) ||
+ BROKER_TYPE.equals(entityType) || GROUP_TYPE.equals(entityType)) {
+ ConfigResource.Type configResourceType;
+ if (TOPIC_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.TOPIC;
+ } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.CLIENT_METRICS;
+ } else if (BROKER_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.BROKER;
+ } else {
+ configResourceType = ConfigResource.Type.GROUP;
+ }
+ try {
+ alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof UnsupportedVersionException) {
+ throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0."
+ + " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.");
+ }
+ throw ee;
+ }
+ } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
+ List validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList();
+ // fail the command if any of the configured broker loggers do not exist
+ List invalidBrokerLoggers = Stream.concat(
+ configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)),
+ configsToBeAdded.keySet().stream().filter(c -> !validLoggers.contains(c))
+ ).toList();
+ if (!invalidBrokerLoggers.isEmpty())
+ throw new InvalidConfigurationException("Invalid broker logger(s): " + String.join(",", invalidBrokerLoggers));
+
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName);
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ List addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
+ List deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
+ Collection alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
+ adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions).all().get(60, TimeUnit.SECONDS);
+ } else if (USER_TYPE.equals(entityType) || CLIENT_TYPE.equals(entityType)) {
+ boolean hasQuotaConfigsToAdd = configsToBeAdded.keySet().stream()
+ .anyMatch(QuotaConfig::isClientOrUserQuotaConfig);
+ Map scramConfigsToAddMap = configsToBeAdded.entrySet().stream()
+ .filter(entry -> ScramMechanism.isScram(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Set unknownConfigsToAdd = configsToBeAdded.keySet().stream()
+ .filter(key -> !ScramMechanism.isScram(key) && !QuotaConfig.isClientOrUserQuotaConfig(key))
+ .collect(Collectors.toSet());
+ boolean hasQuotaConfigsToDelete = configsToBeDeleted.stream()
+ .anyMatch(QuotaConfig::isClientOrUserQuotaConfig);
+ List scramConfigsToDelete = configsToBeDeleted.stream()
+ .filter(ScramMechanism::isScram)
+ .toList();
+ Set unknownConfigsToDelete = configsToBeDeleted.stream()
+ .filter(key -> !ScramMechanism.isScram(key) && !QuotaConfig.isClientOrUserQuotaConfig(key))
+ .collect(Collectors.toSet());
+
+ if (CLIENT_TYPE.equals(entityType) || entityTypes.size() == 2) { // size==2 for case where users is specified first on the command line, before clients
+ // either just a client or both a user and a client
+ if (!unknownConfigsToAdd.isEmpty() || !scramConfigsToAddMap.isEmpty()) {
+ Set combined = new HashSet<>(unknownConfigsToAdd);
+ combined.addAll(scramConfigsToAddMap.keySet());
+ throw new IllegalArgumentException("Only quota configs can be added for '" + CLIENT_TYPE + "' using --bootstrap-server. Unexpected config names: " + String.join(",", combined));
+ }
+ if (!unknownConfigsToDelete.isEmpty() || !scramConfigsToDelete.isEmpty()) {
+ Set combined = new HashSet<>(unknownConfigsToDelete);
+ combined.addAll(scramConfigsToDelete);
+ throw new IllegalArgumentException("Only quota configs can be deleted for '" + CLIENT_TYPE + "' using --bootstrap-server. Unexpected config names: " + String.join(",", combined));
+ }
+ } else { // ConfigType.User
+ if (!unknownConfigsToAdd.isEmpty())
+ throw new IllegalArgumentException("Only quota and SCRAM credential configs can be added for '" + USER_TYPE + "' using --bootstrap-server. Unexpected config names: " + String.join(",", unknownConfigsToAdd));
+ if (!unknownConfigsToDelete.isEmpty())
+ throw new IllegalArgumentException("Only quota and SCRAM credential configs can be deleted for '" + USER_TYPE + "' using --bootstrap-server. Unexpected config names: " + String.join(",", unknownConfigsToDelete));
+ if (!scramConfigsToAddMap.isEmpty() || !scramConfigsToDelete.isEmpty()) {
+ if (entityNames.stream().anyMatch(String::isEmpty)) // either --entity-type users --entity-default or --user-defaults
+ throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.");
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
+ throw new IllegalArgumentException("Cannot alter both quota and SCRAM credential configs simultaneously for '" + USER_TYPE + "' using --bootstrap-server.");
+ }
+ }
+
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
+ alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted);
+ } else {
+ // handle altering user SCRAM credential configs
+ if (entityNames.size() != 1) {
+ // should never happen, if we get here then it is a bug
+ throw new IllegalStateException("Altering user SCRAM credentials should never occur for more zero or multiple users: " + entityNames);
+ }
+ alterUserScramCredentialConfigs(adminClient, entityNames.get(0), scramConfigsToAddMap, scramConfigsToDelete);
+ }
+ } else if (IP_TYPE.equals(entityType)) {
+ Set allConfigNames = new HashSet<>(configsToBeAdded.keySet());
+ allConfigNames.addAll(configsToBeDeleted);
+ Set unknownConfigs = allConfigNames.stream()
+ .filter(key -> !QuotaConfig.ipConfigs().names().contains(key))
+ .collect(Collectors.toSet());
+ if (!unknownConfigs.isEmpty()) {
+ throw new IllegalArgumentException("Only connection quota configs can be added for '" + IP_TYPE + "' using --bootstrap-server. Unexpected config names: " + String.join(", ", unknownConfigs));
+ }
+ alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted);
+ } else {
+ throw new IllegalArgumentException("Unsupported entity type: " + entityType);
+ }
+
+ if (!entityName.isEmpty()) {
+ String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
+ System.out.println("Completed updating config for " + entityTypeSingular + " " + entityName + ".");
+ } else {
+ System.out.println("Completed updating default config for " + entityType + " in the cluster.");
+ }
+ }
+
+ private record IterationsAndPassword(int iterations, byte[] passwordBytes) {
+ }
+
+ private static IterationsAndPassword parseIterationsAndPasswordBytes(org.apache.kafka.common.security.scram.internals.ScramMechanism mechanism, String credentialStr) {
+ Pattern pattern = Pattern.compile("(?:iterations=(\\-?[0-9]*),)?password=(.*)");
+ Matcher matcher = pattern.matcher(credentialStr);
+
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid credential property " + mechanism + "=" + credentialStr);
+ }
+
+ String iterationsStr = matcher.group(1);
+ String password = matcher.group(2);
+
+ int iterations = (iterationsStr != null && !"-1".equals(iterationsStr))
+ ? Integer.parseInt(iterationsStr)
+ : DEFAULT_SCRAM_ITERATIONS;
+
+ if (iterations < mechanism.minIterations()) {
+ throw new IllegalArgumentException("Iterations " + iterations + " is less than the minimum " + mechanism.minIterations() + " required for " + mechanism.mechanismName());
+ }
+
+ return new IterationsAndPassword(iterations, password.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private static void alterUserScramCredentialConfigs(Admin adminClient, String user, Map scramConfigsToAddMap, List scramConfigsToDelete) throws ExecutionException, InterruptedException, TimeoutException {
+ List deletions = scramConfigsToDelete.stream()
+ .map(mechanismName -> new UserScramCredentialDeletion(user, org.apache.kafka.clients.admin.ScramMechanism.fromMechanismName(mechanismName)))
+ .toList();
+
+ List upsertions = scramConfigsToAddMap.entrySet().stream()
+ .map(entry -> {
+ String mechanismName = entry.getKey();
+ ConfigEntry configEntry = entry.getValue();
+ org.apache.kafka.common.security.scram.internals.ScramMechanism mechanism =
+ org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanismName);
+ IterationsAndPassword result = parseIterationsAndPasswordBytes(mechanism, configEntry.value());
+ return new UserScramCredentialUpsertion(
+ user,
+ new ScramCredentialInfo(org.apache.kafka.clients.admin.ScramMechanism.fromMechanismName(mechanismName), result.iterations),
+ result.passwordBytes
+ );
+ })
+ .toList();
+
+ // we are altering only a single user by definition, so we don't have to worry about one user succeeding and another
+ // failing; therefore just check the success of all the futures (since there will only be 1)
+ List allCredentials = new ArrayList<>();
+ allCredentials.addAll(deletions);
+ allCredentials.addAll(upsertions);
+ adminClient.alterUserScramCredentials(allCredentials).all().get(60, TimeUnit.SECONDS);
+ }
+
+ private static void alterQuotaConfigs(Admin adminClient, List entityTypes, List entityNames, Map configsToBeAddedMap, List configsToBeDeleted) throws ExecutionException, InterruptedException, TimeoutException {
+ // handle altering client/user quota configs
+ Map oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames);
+
+ List invalidConfigs = configsToBeDeleted.stream()
+ .filter(config -> !oldConfig.containsKey(config))
+ .toList();
+ if (!invalidConfigs.isEmpty())
+ throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));
+
+ List alterEntityTypes = entityTypes.stream()
+ .map(type -> {
+ if (USER_TYPE.equals(type)) {
+ return ClientQuotaEntity.USER;
+ } else if (CLIENT_TYPE.equals(type)) {
+ return ClientQuotaEntity.CLIENT_ID;
+ } else if (IP_TYPE.equals(type)) {
+ return ClientQuotaEntity.IP;
+ } else {
+ throw new IllegalArgumentException("Unexpected entity type: " + type);
+ }
+ })
+ .toList();
+
+ List alterEntityNames = entityNames.stream()
+ .map(en -> en.isEmpty() ? null : en)
+ .toList();
+
+ // Explicitly populate a HashMap to ensure nulls are recorded properly.
+ Map alterEntityMap = new HashMap<>();
+ for (int i = 0; i < alterEntityTypes.size(); i++) {
+ alterEntityMap.put(alterEntityTypes.get(i), alterEntityNames.get(i));
+ }
+ ClientQuotaEntity entity = new ClientQuotaEntity(alterEntityMap);
+
+ AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions().validateOnly(false);
+
+ List addOps = configsToBeAddedMap.entrySet().stream()
+ .map(entry -> {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ double doubleValue;
+ try {
+ doubleValue = Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Cannot parse quota configuration value for " + key + ": " + value);
+ }
+ return new ClientQuotaAlteration.Op(key, doubleValue);
+ })
+ .toList();
+
+ List deleteOps = configsToBeDeleted.stream()
+ .map(key -> new ClientQuotaAlteration.Op(key, null))
+ .toList();
+
+ Collection alterOps = Stream.concat(addOps.stream(), deleteOps.stream()).toList();
+
+ adminClient.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(entity, alterOps)), alterOptions)
+ .all().get(60, TimeUnit.SECONDS);
+ }
+
+ static void describeConfig(Admin adminClient, ConfigCommandOptions opts) throws Exception {
+ List entityTypes = opts.entityTypes();
+ List entityNames = opts.entityNames();
+ boolean describeAll = opts.options.has(opts.allOpt);
+
+ String entityType = entityTypes.get(0);
+ if (TOPIC_TYPE.equals(entityType) || BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType) ||
+ CLIENT_METRICS_TYPE.equals(entityType) || GROUP_TYPE.equals(entityType)) {
+ describeResourceConfig(adminClient, entityType, entityNames.isEmpty() ? Optional.empty() : Optional.of(entityNames.get(0)), describeAll);
+ } else if (USER_TYPE.equals(entityType) || CLIENT_TYPE.equals(entityType)) {
+ describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames);
+ } else if (IP_TYPE.equals(entityType)) {
+ describeQuotaConfigs(adminClient, entityTypes, entityNames);
+ } else {
+ throw new IllegalArgumentException("Invalid entity type: " + entityType);
+ }
+ }
+
+ private static void describeResourceConfig(Admin adminClient, String entityType, Optional entityName, boolean describeAll) throws Exception {
+ if (!describeAll) {
+ if (entityName.isPresent()) {
+ String name = entityName.get();
+ String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
+ if (TOPIC_TYPE.equals(entityType)) {
+ Topic.validate(name);
+ if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().contains(name)) {
+ System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config.");
+ return;
+ }
+ } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
+ if (adminClient.describeCluster().nodes().get().stream().anyMatch(n -> n.idString().equals(name))) {
+ // valid broker id
+ } else if (BROKER_DEFAULT_ENTITY_NAME.equals(name)) {
+ // default broker configs
+ } else {
+ System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config.");
+ return;
+ }
+ } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
+ if (adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get()
+ .stream().noneMatch(resource -> resource.name().equals(name))) {
+ System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config.");
+ return;
+ }
+ } else if (GROUP_TYPE.equals(entityType)) {
+ boolean noMatchInGroups = adminClient.listGroups().all().get().stream()
+ .noneMatch(group -> group.groupId().equals(name));
+ boolean noMatchInResources = listGroupConfigResources(adminClient)
+ .map(resources -> resources.stream().noneMatch(resource -> resource.name().equals(name)))
+ .orElse(false);
+ if (noMatchInGroups && noMatchInResources) {
+ System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config.");
+ return;
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid entity type: " + entityType);
+ }
+ }
+ }
+
+ List entities;
+ if (entityName.isPresent()) {
+ entities = List.of(entityName.get());
+ } else {
+ if (TOPIC_TYPE.equals(entityType)) {
+ entities = new ArrayList<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get());
+ } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
+ List brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream()
+ .map(Node::idString)
+ .collect(Collectors.toList());
+ brokerIds.add(BROKER_DEFAULT_ENTITY_NAME);
+ entities = brokerIds;
+ } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
+ entities = adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get().stream()
+ .map(ConfigResource::name)
+ .toList();
+ } else if (GROUP_TYPE.equals(entityType)) {
+ Set groupIds = adminClient.listGroups().all().get().stream()
+ .map(GroupListing::groupId)
+ .collect(Collectors.toSet());
+ Set groupResources = listGroupConfigResources(adminClient)
+ .map(resources -> resources.stream()
+ .map(ConfigResource::name)
+ .collect(Collectors.toSet()))
+ .orElse(Set.of());
+ Set combined = new HashSet<>(groupIds);
+ combined.addAll(groupResources);
+ entities = new ArrayList<>(combined);
+ } else {
+ throw new IllegalArgumentException("Invalid entity type: " + entityType);
+ }
+ }
+
+ for (String entity : entities) {
+ if (BROKER_DEFAULT_ENTITY_NAME.equals(entity)) {
+ System.out.println("Default configs for " + entityType + " in the cluster are:");
+ } else {
+ String configSourceStr = describeAll ? "All" : "Dynamic";
+ String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
+ System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:");
+ }
+ getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> {
+ String synonyms = entry.synonyms().stream()
+ .map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value())
+ .collect(Collectors.joining(", ", "{", "}"));
+ System.out.println(" " + entry.name() + "=" + entry.value() + " sensitive=" + entry.isSensitive() + " synonyms=" + synonyms);
+ });
+ }
+ }
+
+ private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List configsToBeDeleted, Map configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
+ Map oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false)
+ .stream()
+ .collect(Collectors.toMap(ConfigEntry::name, entry -> entry));
+
+ // fail the command if any of the configs to be deleted does not exist
+ List invalidConfigs = configsToBeDeleted.stream()
+ .filter(config -> !oldConfig.containsKey(config))
+ .toList();
+ if (!invalidConfigs.isEmpty())
+ throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));
+
+ ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ List addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
+ List deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
+ Collection alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
+ adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions).all().get(60, TimeUnit.SECONDS);
+ }
+
+ static void validateBrokerId(String entityName, String entityType) {
+ try {
+ Integer.parseInt(entityName);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("The entity name for " + entityType + " must be a valid integer broker id, found: " + entityName);
+ }
+ }
+
+ private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+ ConfigResource.Type configResourceType;
+ Optional dynamicConfigSource;
+
+ if (TOPIC_TYPE.equals(entityType)) {
+ if (!entityName.isEmpty()) {
+ Topic.validate(entityName);
+ }
+ configResourceType = ConfigResource.Type.TOPIC;
+ dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
+ } else if (BROKER_TYPE.equals(entityType)) {
+ if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
+ configResourceType = ConfigResource.Type.BROKER;
+ dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
+ } else {
+ validateBrokerId(entityName, entityType);
+ configResourceType = ConfigResource.Type.BROKER;
+ dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG);
+ }
+ } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
+ if (!entityName.isEmpty()) {
+ validateBrokerId(entityName, entityType);
+ }
+ configResourceType = ConfigResource.Type.BROKER_LOGGER;
+ dynamicConfigSource = Optional.empty();
+ } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.CLIENT_METRICS;
+ dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG);
+ } else if (GROUP_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.GROUP;
+ dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG);
+ } else {
+ throw new IllegalArgumentException("Invalid entity type: " + entityType);
+ }
+
+ Optional configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource;
+
+ ConfigResource configResource = new ConfigResource(configResourceType, entityName);
+ DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
+ Map configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
+ .all().get(30, TimeUnit.SECONDS);
+
+ return configs.get(configResource).entries().stream()
+ .filter(entry -> configSourceFilter.isEmpty() || entry.source() == configSourceFilter.get())
+ .sorted(Comparator.comparing(ConfigEntry::name))
+ .toList();
+ }
+
+ private static void describeQuotaConfigs(Admin adminClient, List entityTypes, List entityNames) throws ExecutionException, InterruptedException, TimeoutException {
+ Map> quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames);
+ quotaConfigs.forEach((entity, entries) -> {
+ Map entityEntries = entity.entries();
+
+ Function> entitySubstr = entityType -> {
+ String name = entityEntries.get(entityType);
+ if (name == null) {
+ return Optional.empty();
+ }
+ String typeStr = switch (entityType) {
+ case ClientQuotaEntity.USER -> "user-principal";
+ case ClientQuotaEntity.CLIENT_ID -> "client-id";
+ case ClientQuotaEntity.IP -> "ip";
+ default -> throw new IllegalArgumentException("Unknown entity type: " + entityType);
+ };
+ String result = name.isEmpty() ? "the default " + typeStr : typeStr + " '" + name + "'";
+ return Optional.of(result);
+ };
+
+ String entityStr = Stream.of(
+ entitySubstr.apply(ClientQuotaEntity.USER),
+ entitySubstr.apply(ClientQuotaEntity.CLIENT_ID),
+ entitySubstr.apply(ClientQuotaEntity.IP)
+ )
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.joining(", "));
+
+ String entriesStr = entries.entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.joining(", "));
+
+ System.out.println("Quota configs for " + entityStr + " are " + entriesStr);
+ });
+ }
+
+ private static void describeClientQuotaAndUserScramCredentialConfigs(Admin adminClient, List entityTypes, List entityNames) throws ExecutionException, InterruptedException, TimeoutException {
+ describeQuotaConfigs(adminClient, entityTypes, entityNames);
+ // we describe user SCRAM credentials only when we are not describing client information
+ // and we are not given either --entity-default or --user-defaults
+ if (!entityTypes.contains(CLIENT_TYPE) && !entityNames.contains("")) {
+ DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials(entityNames);
+ result.users().get(30, TimeUnit.SECONDS).forEach(user -> {
+ try {
+ UserScramCredentialsDescription description = result.description(user).get(30, TimeUnit.SECONDS);
+ String descriptionText = description.credentialInfos().stream()
+ .map(info -> info.mechanism().mechanismName() + "=iterations=" + info.iterations())
+ .collect(Collectors.joining(", "));
+ System.out.println("SCRAM credential configs for user-principal '" + user + "' are " + descriptionText);
+ } catch (Exception e) {
+ System.out.println("Error retrieving SCRAM credential configs for user-principal '" + user + "': " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ });
+ }
+ }
+
+ private static Map getClientQuotasConfig(Admin adminClient, List entityTypes, List entityNames) throws ExecutionException, InterruptedException, TimeoutException {
+ if (entityTypes.size() != entityNames.size())
+ throw new IllegalArgumentException("Exactly one entity name must be specified for every entity type");
+ return getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
+ .values()
+ .stream()
+ .findFirst()
+ .orElse(Map.of());
+ }
+
+ private static Map> getAllClientQuotasConfigs(Admin adminClient, List entityTypes, List entityNames) throws ExecutionException, InterruptedException, TimeoutException {
+ int maxSize = Math.max(entityTypes.size(), entityNames.size());
+ List components = new ArrayList<>();
+
+ for (int i = 0; i < maxSize; i++) {
+ Optional entityTypeOpt = i < entityTypes.size() ? Optional.of(entityTypes.get(i)) : Optional.empty();
+ Optional entityNameOpt = i < entityNames.size() ? Optional.of(entityNames.get(i)) : Optional.empty();
+
+ String entityType;
+ if (entityTypeOpt.isPresent()) {
+ String typeValue = entityTypeOpt.get();
+ if (USER_TYPE.equals(typeValue)) {
+ entityType = ClientQuotaEntity.USER;
+ } else if (CLIENT_TYPE.equals(typeValue)) {
+ entityType = ClientQuotaEntity.CLIENT_ID;
+ } else if (IP_TYPE.equals(typeValue)) {
+ entityType = ClientQuotaEntity.IP;
+ } else {
+ throw new IllegalArgumentException("Unexpected entity type " + typeValue);
+ }
+ } else {
+ throw new IllegalArgumentException("More entity names specified than entity types");
+ }
+
+ ClientQuotaFilterComponent component;
+ if (entityNameOpt.isEmpty()) {
+ component = ClientQuotaFilterComponent.ofEntityType(entityType);
+ } else if (entityNameOpt.get().isEmpty()) {
+ component = ClientQuotaFilterComponent.ofDefaultEntity(entityType);
+ } else {
+ component = ClientQuotaFilterComponent.ofEntity(entityType, entityNameOpt.get());
+ }
+ components.add(component);
+ }
+
+ return adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components)).entities().get(30, TimeUnit.SECONDS);
+ }
+
+ private static Optional> listGroupConfigResources(Admin adminClient) throws Exception {
+ try {
+ return Optional.of(adminClient.listConfigResources(Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get());
+ } catch (ExecutionException ee) {
+ // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException and ClusterAuthorizationException as None
+ if (ee.getCause() instanceof UnsupportedVersionException) return Optional.empty();
+ if (ee.getCause() instanceof ClusterAuthorizationException) return Optional.empty();
+ else throw (Exception) ee.getCause();
+ }
+ }
+
+ static class ConfigCommandOptions extends CommandDefaultOptions {
+ private final OptionSpec bootstrapServerOpt;
+ private final OptionSpec bootstrapControllerOpt;
+ private final OptionSpec commandConfigOpt;
+ private final OptionSpec alterOpt;
+ private final OptionSpec describeOpt;
+ private final OptionSpec allOpt;
+ private final OptionSpec entityType;
+ private final OptionSpec entityName;
+ private final OptionSpec entityDefault;
+ private final OptionSpec addConfig;
+ private final OptionSpec addConfigFile;
+ private final OptionSpec deleteConfig;
+ private final OptionSpec topic;
+ private final OptionSpec client;
+ private final OptionSpec clientDefaults;
+ private final OptionSpec user;
+ private final OptionSpec userDefaults;
+ private final OptionSpec broker;
+ private final OptionSpec brokerDefaults;
+ private final OptionSpec brokerLogger;
+ private final OptionSpec ipDefaults;
+ private final OptionSpec ip;
+ private final OptionSpec group;
+ private final OptionSpec clientMetrics;
+
+ private static String formatConfigNames(Collection names) {
+ String nl = System.lineSeparator();
+ return names.stream()
+ .sorted()
+ .map(name -> "\t" + name)
+ .collect(Collectors.joining(nl, nl, nl));
+ }
+
+ ConfigCommandOptions(String[] args) {
+ super(args);
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.")
+ .withRequiredArg()
+ .describedAs("controller to connect to")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
+ "This is used only with --bootstrap-server option for describing and altering broker configs.")
+ .withRequiredArg()
+ .describedAs("command config property file")
+ .ofType(String.class);
+ alterOpt = parser.accepts("alter", "Alter the configuration for the entity.");
+ describeOpt = parser.accepts("describe", "List configs for the given entity.");
+ allOpt = parser.accepts("all", "List all configs for the given entity, including static configs if available.");
+
+ entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers/ips/client-metrics/groups)")
+ .withRequiredArg()
+ .ofType(String.class);
+ entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id/ip/client metrics/group id)")
+ .withRequiredArg()
+ .ofType(String.class);
+ entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers/ips (applies to corresponding entity type)");
+
+ addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
+ "For entity-type '" + TOPIC_TYPE + "': " + formatConfigNames(LogConfig.nonInternalConfigNames()) +
+ "For entity-type '" + BROKER_TYPE + "': " + formatConfigNames(DynamicConfig.Broker.names()) +
+ "For entity-type '" + USER_TYPE + "': " + formatConfigNames(QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names()) +
+ "For entity-type '" + CLIENT_TYPE + "': " + formatConfigNames(QuotaConfig.userAndClientQuotaConfigs().names()) +
+ "For entity-type '" + IP_TYPE + "': " + formatConfigNames(QuotaConfig.ipConfigs().names()) +
+ "For entity-type '" + CLIENT_METRICS_TYPE + "': " + formatConfigNames(ClientMetricsConfigs.configNames()) +
+ "For entity-type '" + GROUP_TYPE + "': " + formatConfigNames(GroupConfig.configNames()) +
+ "Entity types '" + USER_TYPE + "' and '" + CLIENT_TYPE + "' may be specified together to update config for clients of a specific user.")
+ .withRequiredArg()
+ .ofType(String.class);
+ addConfigFile = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
+ .withRequiredArg()
+ .ofType(String.class);
+ deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
+ .withRequiredArg()
+ .ofType(String.class)
+ .withValuesSeparatedBy(',');
+ topic = parser.accepts("topic", "The topic's name.")
+ .withRequiredArg()
+ .ofType(String.class);
+ client = parser.accepts("client", "The client's ID.")
+ .withRequiredArg()
+ .ofType(String.class);
+ clientDefaults = parser.accepts("client-defaults", "The config defaults for all clients.");
+ user = parser.accepts("user", "The user's principal name.")
+ .withRequiredArg()
+ .ofType(String.class);
+ userDefaults = parser.accepts("user-defaults", "The config defaults for all users.");
+ broker = parser.accepts("broker", "The broker's ID.")
+ .withRequiredArg()
+ .ofType(String.class);
+ brokerDefaults = parser.accepts("broker-defaults", "The config defaults for all brokers.");
+ brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.")
+ .withRequiredArg()
+ .ofType(String.class);
+ ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.");
+ ip = parser.accepts("ip", "The IP address.")
+ .withRequiredArg()
+ .ofType(String.class);
+ group = parser.accepts("group", "The group's ID.")
+ .withRequiredArg()
+ .ofType(String.class);
+ clientMetrics = parser.accepts("client-metrics", "The client metrics config resource name.")
+ .withRequiredArg()
+ .ofType(String.class);
+
+ options = parser.parse(args);
+ }
+
+ private record EntityFlag(OptionSpec> spec, String type) { }
+
+ private List entityFlags() {
+ return List.of(
+ new EntityFlag(topic, TOPIC_TYPE),
+ new EntityFlag(client, CLIENT_TYPE),
+ new EntityFlag(user, USER_TYPE),
+ new EntityFlag(broker, BROKER_TYPE),
+ new EntityFlag(brokerLogger, BROKER_LOGGER_CONFIG_TYPE),
+ new EntityFlag(ip, IP_TYPE),
+ new EntityFlag(clientMetrics, CLIENT_METRICS_TYPE),
+ new EntityFlag(group, GROUP_TYPE)
+ );
+ }
+
+ private List entityDefaultsFlags() {
+ return List.of(
+ new EntityFlag(clientDefaults, CLIENT_TYPE),
+ new EntityFlag(userDefaults, USER_TYPE),
+ new EntityFlag(brokerDefaults, BROKER_TYPE),
+ new EntityFlag(ipDefaults, IP_TYPE)
+ );
+ }
+
+ List entityTypes() {
+ List fromEntityType = new ArrayList<>(options.valuesOf(entityType));
+ List fromFlags = Stream.concat(entityFlags().stream(), entityDefaultsFlags().stream())
+ .filter(entity -> options.has(entity.spec()))
+ .map(EntityFlag::type)
+ .toList();
+ List result = new ArrayList<>(fromEntityType);
+ result.addAll(fromFlags);
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ List entityNames() {
+ Iterator namesIterator = options.valuesOf(entityName).iterator();
+ List fromSpecs = options.specs().stream()
+ .filter(spec -> spec.options().contains("entity-name") || spec.options().contains("entity-default"))
+ .map(spec -> spec.options().contains("entity-name") ? namesIterator.next() : "")
+ .toList();
+
+ List fromEntityFlags = entityFlags().stream()
+ .filter(entity -> options.has(entity.spec()))
+ .map(entity -> options.valueOf((OptionSpec) entity.spec()))
+ .toList();
+
+ List fromDefaultFlags = entityDefaultsFlags().stream()
+ .filter(entity -> options.has(entity.spec()))
+ .map(entity -> "")
+ .toList();
+
+ return Stream.of(fromSpecs, fromEntityFlags, fromDefaultFlags)
+ .flatMap(List::stream)
+ .toList();
+ }
+
+ public void checkArgs() {
+ // should have exactly one action
+ long actions = Stream.of(alterOpt, describeOpt).filter(options::has).count();
+ if (actions != 1)
+ CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --describe, --alter");
+ // check required args
+ CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, alterOpt, addConfig, deleteConfig);
+
+ List entityTypeVals = entityTypes();
+ long distinctCount = entityTypeVals.stream().distinct().count();
+ if (entityTypeVals.size() != distinctCount) {
+ Set seen = new HashSet<>();
+ List duplicates = entityTypeVals.stream()
+ .filter(type -> !seen.add(type))
+ .distinct()
+ .toList();
+ throw new IllegalArgumentException("Duplicate entity type(s) specified: " + String.join(",", duplicates));
+ }
+
+ List allowedEntityTypes;
+ if (options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt)) {
+ allowedEntityTypes = BROKER_SUPPORTED_CONFIG_TYPES;
+ } else {
+ throw new IllegalArgumentException("Either --bootstrap-server or --bootstrap-controller must be specified.");
+ }
+
+ String connectOptString = "--bootstrap-server or --bootstrap-controller";
+ entityTypeVals.forEach(entityTypeVal -> {
+ if (!allowedEntityTypes.contains(entityTypeVal))
+ throw new IllegalArgumentException("Invalid entity type " + entityTypeVal + ", the entity type must be one of " + String.join(", ", allowedEntityTypes) + " with a " + connectOptString + " argument");
+ });
+ if (entityTypeVals.isEmpty())
+ throw new IllegalArgumentException("At least one entity type must be specified");
+ else if (entityTypeVals.size() > 1 && !(Set.copyOf(entityTypeVals).equals(Set.of(USER_TYPE, CLIENT_TYPE))))
+ throw new IllegalArgumentException("Only '" + USER_TYPE + "' and '" + CLIENT_TYPE + "' entity types may be specified together");
+
+ if ((options.has(entityName) || options.has(entityType) || options.has(entityDefault)) &&
+ Stream.concat(entityFlags().stream(), entityDefaultsFlags().stream())
+ .anyMatch(entity -> options.has(entity.spec())))
+ throw new IllegalArgumentException("--entity-{type,name,default} should not be used in conjunction with specific entity flags");
+
+ List entityNamesVals = entityNames();
+ boolean hasEntityName = entityNamesVals.stream().anyMatch(name -> !name.isEmpty());
+ boolean hasEntityDefault = entityNamesVals.stream().anyMatch(String::isEmpty);
+
+ int numConnectOptions = (options.has(bootstrapServerOpt) ? 1 : 0) + (options.has(bootstrapControllerOpt) ? 1 : 0);
+ if (numConnectOptions > 1)
+ throw new IllegalArgumentException("Only one of --bootstrap-server or --bootstrap-controller can be specified");
+ if (hasEntityName && (entityTypeVals.contains(BROKER_TYPE) || entityTypeVals.contains(BROKER_LOGGER_CONFIG_TYPE))) {
+ Stream.of(entityName, broker, brokerLogger)
+ .filter(options::has)
+ .map(options::valueOf)
+ .forEach(brokerId -> {
+ try {
+ Integer.parseInt(brokerId);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("The entity name for " + entityTypeVals.get(0) + " must be a valid integer broker id, but it is: " + brokerId);
+ }
+ });
+ }
+
+ if (hasEntityName && entityTypeVals.contains(IP_TYPE)) {
+ Stream.of(entityName, ip)
+ .filter(options::has)
+ .map(options::valueOf)
+ .forEach(ipEntity -> {
+ if (!isValidIpEntity(ipEntity))
+ throw new IllegalArgumentException("The entity name for " + entityTypeVals.get(0) + " must be a valid IP or resolvable host, but it is: " + ipEntity);
+ });
+ }
+
+ if (options.has(describeOpt)) {
+ if (!(entityTypeVals.contains(USER_TYPE) ||
+ entityTypeVals.contains(CLIENT_TYPE) ||
+ entityTypeVals.contains(BROKER_TYPE) ||
+ entityTypeVals.contains(IP_TYPE)) && options.has(entityDefault)) {
+ throw new IllegalArgumentException("--entity-default must not be specified with --describe of " + String.join(",", entityTypeVals));
+ }
+
+ if (entityTypeVals.contains(BROKER_LOGGER_CONFIG_TYPE) && !hasEntityName)
+ throw new IllegalArgumentException("An entity name must be specified with --describe of " + String.join(",", entityTypeVals));
+ }
+
+ if (options.has(alterOpt)) {
+ if (entityTypeVals.contains(USER_TYPE) ||
+ entityTypeVals.contains(CLIENT_TYPE) ||
+ entityTypeVals.contains(BROKER_TYPE) ||
+ entityTypeVals.contains(IP_TYPE)) {
+ if (!hasEntityName && !hasEntityDefault)
+ throw new IllegalArgumentException("An entity-name or default entity must be specified with --alter of users, clients, brokers or ips");
+ } else if (!hasEntityName)
+ throw new IllegalArgumentException("An entity name must be specified with --alter of " + String.join(",", entityTypeVals));
+
+ boolean isAddConfigPresent = options.has(addConfig);
+ boolean isAddConfigFilePresent = options.has(addConfigFile);
+ boolean isDeleteConfigPresent = options.has(deleteConfig);
+
+ if (isAddConfigPresent && isAddConfigFilePresent)
+ throw new IllegalArgumentException("Only one of --add-config or --add-config-file must be specified");
+
+ if (!isAddConfigPresent && !isAddConfigFilePresent && !isDeleteConfigPresent)
+ throw new IllegalArgumentException("At least one of --add-config, --add-config-file, or --delete-config must be specified with --alter");
+ }
+ }
+ }
+
+ private static boolean isValidIpEntity(String ip) {
+ try {
+ InetAddress.getByName(ip);
+ return true;
+ } catch (UnknownHostException uhe) {
+ return false;
+ }
+ }
+
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 9cb8ef7f3b12d..7fca102f8a627 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools;
-import kafka.admin.ConfigCommand;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
@@ -574,12 +573,12 @@ public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
}
@ClusterTest
- public void testUpdateInvalidBrokerConfigs() throws InterruptedException {
+ public void testUpdateInvalidBrokerConfigs() throws Exception {
updateAndCheckInvalidBrokerConfig(Optional.empty());
updateAndCheckInvalidBrokerConfig(Optional.of(String.valueOf((cluster.brokers().entrySet().iterator().next().getKey()))));
}
- private void updateAndCheckInvalidBrokerConfig(Optional brokerIdOrDefault) throws InterruptedException {
+ private void updateAndCheckInvalidBrokerConfig(Optional brokerIdOrDefault) throws Exception {
List alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.admin()) {
alterConfigWithAdmin(client, brokerIdOrDefault, Map.of("invalid", "2"), alterOpts);
@@ -596,7 +595,7 @@ private void updateAndCheckInvalidBrokerConfig(Optional brokerIdOrDefaul
last.set(describeResult);
return describeResult.contains("invalid=null");
- }, 5000, () -> "Dynamic broker config was not visible within 5s (missing 'invalid=null').\n" +
+ }, 5000, () -> "Dynamic broker config was not visible within 5s (missing 'invalid=null').\n" +
"Last describe output:\n" + last.get());
assertTrue(last.get().contains("sensitive=true"));
@@ -627,7 +626,7 @@ public void testUpdateInvalidTopicConfigs() throws ExecutionException, Interrupt
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.
@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "2097154"),
})
- public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() throws Exception {
try (Admin client = cluster.admin()) {
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(
toArray(List.of("--bootstrap-server", cluster.bootstrapServers(),
@@ -702,6 +701,18 @@ public void testDescribeNonExistentConfigResource() {
});
}
+ @ClusterTest
+ public void testIntervalMsParser(ClusterInstance clusterInstance) {
+ List alterOpts = List.of("--bootstrap-server", clusterInstance.bootstrapServers(),
+ "--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
+ try (Admin client = clusterInstance.admin()) {
+ ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(alterOpts.toArray(String[]::new));
+
+ Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
+ assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
+ }
+ }
+
private void assertNonZeroStatusExit(Stream args, Consumer checkErrOut) {
AtomicReference exitStatus = new AtomicReference<>();
Exit.setExitProcedure((status, __) -> {
@@ -762,7 +773,7 @@ private void alterAndVerifyBrokerLoggerConfig(Admin client,
verifyBrokerLoggerConfig(client, brokerId, config);
}
- private void alterConfigWithAdmin(Admin client, Optional resourceName, Map config, List alterOpts) {
+ private void alterConfigWithAdmin(Admin client, Optional resourceName, Map config, List alterOpts) throws Exception {
String configStr = transferConfigMapToString(config);
List bootstrapOpts = quorumArgs().toList();
ConfigCommand.ConfigCommandOptions addOpts =
@@ -774,7 +785,7 @@ private void alterConfigWithAdmin(Admin client, Optional resourceName, M
ConfigCommand.alterConfig(client, addOpts);
}
- private void alterConfigWithAdmin(Admin client, Map config, List alterOpts) {
+ private void alterConfigWithAdmin(Admin client, Map config, List alterOpts) throws Exception {
String configStr = transferConfigMapToString(config);
List bootstrapOpts = quorumArgs().toList();
ConfigCommand.ConfigCommandOptions addOpts =
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index f0e392c747b08..a0743d5ba00d0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.tools;
-import kafka.admin.ConfigCommand;
-
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
import org.apache.kafka.clients.admin.AlterClientQuotasResult;
@@ -74,7 +72,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -137,76 +134,76 @@ public static void assertNonZeroStatusExit(String... args) {
}
@Test
- public void shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "clients");
}
@Test
- public void shouldParseArgumentsForClientsEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForClientsEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "clients");
}
@Test
- public void shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "users");
}
@Test
- public void shouldParseArgumentsForUsersEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForUsersEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "users");
}
@Test
- public void shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "topics");
}
@Test
- public void shouldParseArgumentsForTopicsEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForTopicsEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "topics");
}
@Test
- public void shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "brokers");
}
@Test
- public void shouldParseArgumentsForBrokersEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForBrokersEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "brokers");
}
@Test
- public void shouldParseArgumentsForBrokerLoggersEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForBrokerLoggersEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "broker-loggers");
}
@Test
- public void shouldParseArgumentsForBrokerLoggersEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForBrokerLoggersEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "broker-loggers");
}
@Test
- public void shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "ips");
}
@Test
- public void shouldParseArgumentsForIpEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForIpEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "ips");
}
@Test
- public void shouldParseArgumentsForGroupEntityTypeWithBrokerBootstrap() {
+ public void shouldParseArgumentsForGroupEntityTypeWithBrokerBootstrap() throws IOException {
testArgumentParse(BROKER_BOOTSTRAP, "groups");
}
@Test
- public void shouldParseArgumentsForGroupEntityTypeWithControllerBootstrap() {
+ public void shouldParseArgumentsForGroupEntityTypeWithControllerBootstrap() throws IOException {
testArgumentParse(CONTROLLER_BOOTSTRAP, "groups");
}
- public void testArgumentParse(List bootstrapArguments, String entityType) {
+ public void testArgumentParse(List bootstrapArguments, String entityType) throws IOException {
String shortFlag = "--" + entityType.substring(0, entityType.length() - 1);
String connectOpts1 = bootstrapArguments.get(0);
String connectOpts2 = bootstrapArguments.get(1);
@@ -285,9 +282,9 @@ public void testArgumentParse(List bootstrapArguments, String entityType
assertEquals("b", addedProps.getProperty("a"));
assertEquals("d", addedProps.getProperty("c"));
- Seq deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts);
+ List deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts);
assertEquals(1, deletedProps.size());
- assertEquals("a", deletedProps.apply(0));
+ assertEquals("a", deletedProps.get(0));
createOpts = new ConfigCommand.ConfigCommandOptions(toArray(connectOpts1, connectOpts2,
"--entity-name", "1",
@@ -429,8 +426,8 @@ public void testParseConfigsToBeAddedForAddConfigFile() throws IOException {
public void testExpectedEntityTypeNames(List expectedTypes, List expectedNames, List connectOpts, String... args) {
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(List.of(connectOpts.get(0), connectOpts.get(1), "--describe"), List.of(args)));
createOpts.checkArgs();
- assertEquals(seq(expectedTypes), createOpts.entityTypes().toSeq());
- assertEquals(seq(expectedNames), createOpts.entityNames().toSeq());
+ assertEquals(expectedTypes, createOpts.entityTypes());
+ assertEquals(expectedNames, createOpts.entityNames());
}
@Test
@@ -539,7 +536,7 @@ public void shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer() {
verifyAlterCommandFails(invalidProp, concat(ipEntityOpts, List.of("--delete-config", "some_config=10")));
}
- private void verifyDescribeQuotas(List describeArgs, ClientQuotaFilter expectedFilter) {
+ private void verifyDescribeQuotas(List describeArgs, ClientQuotaFilter expectedFilter) throws Exception {
ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray(List.of("--bootstrap-server", "localhost:9092",
"--describe"), describeArgs));
KafkaFutureImpl