Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1fcfe35
completed the feature
m1a2st Nov 27, 2025
cc33873
updated the test
m1a2st Nov 27, 2025
f5f78e7
fix test
m1a2st Nov 28, 2025
59d3be2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 2, 2025
c8d913f
temp version
m1a2st Dec 2, 2025
f8d01df
temp version
m1a2st Dec 2, 2025
a165eb5
test in the kafka CI
m1a2st Dec 3, 2025
48184c2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 3, 2025
1ff2a4c
addressed by comments
m1a2st Dec 3, 2025
da21c8b
fix the fail test
m1a2st Dec 3, 2025
fdbb81c
add new test for ConfigAdminManagerTest
m1a2st Dec 3, 2025
a0198d9
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 8, 2025
d6ae031
addressed by comments
m1a2st Dec 8, 2025
138e8e8
update the addConfigChangesOrHandleDuplicate logic
m1a2st Dec 8, 2025
72e787b
update the related logic
m1a2st Dec 8, 2025
bfd3860
revert the error logic
m1a2st Dec 8, 2025
2a4c9ea
remove unused exception throwing
m1a2st Dec 8, 2025
d388063
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 20, 2025
6df68ba
resolve conflict
m1a2st Dec 20, 2025
bb8c33a
add new validator
m1a2st Dec 20, 2025
16ca216
fix the fail test
m1a2st Dec 21, 2025
ca12d22
temp
m1a2st Dec 21, 2025
ac4b124
temp patch
m1a2st Dec 24, 2025
b07102e
revert unused change
m1a2st Dec 24, 2025
500b7ae
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 24, 2025
8d1cf15
check the resource operation type
m1a2st Dec 25, 2025
d896bfb
check null value
m1a2st Dec 25, 2025
a690de7
check delete opType
m1a2st Dec 25, 2025
e7251e2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 25, 2025
374c301
Merge branch 'trunk' into KAFKA-19931
m1a2st Jan 4, 2026
59264ff
update the logic
m1a2st Jan 4, 2026
a350c1c
Merge branch 'trunk' into KAFKA-19931
m1a2st Feb 1, 2026
63d8513
resolve conflict
m1a2st Feb 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 77 additions & 39 deletions core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
* KIP-412 added support for changing log4j log levels via IncrementalAlterConfigs, but
* not via the original AlterConfigs. In retrospect, this would have been better off as a
* separate RPC, since the semantics are quite different. In particular, KIP-226 configs
* are stored durably and persist across broker restarts, but KIP-412 log4j levels do not.
* are stored durably and persist across broker restarts, but KIP-412 log4j levels do not.
* However, we have to handle it here now in order to maintain compatibility.
*
* Configuration processing is split into two parts.
Expand Down Expand Up @@ -112,48 +112,33 @@ class ConfigAdminManager(nodeId: Int,
})
request.resources().forEach(resource => {
if (!results.containsKey(resource)) {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
val configResource = new ConfigResource(resourceType, resource.resourceName())
try {
if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
throw new InvalidRequestException("Error due to duplicate config keys")
}
val nullUpdates = new util.ArrayList[String]()
resource.configs().forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
processConfigResource(
resource,
() => {
runtimeLoggerManager.applyChangesForResource(
authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
request.validateOnly(),
resource)
results.put(resource, ApiError.NONE)
},
configResource => {
// The resource name must be either blank (if setting a cluster config) or
// the ID of this specific broker.
if (configResource.name().nonEmpty) {
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
}
if (!nullUpdates.isEmpty) {
throw new InvalidRequestException("Null value not supported for : " +
String.join(", ", nullUpdates))
}
resourceType match {
case BROKER_LOGGER =>
runtimeLoggerManager.applyChangesForResource(
authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
request.validateOnly(),
resource)
results.put(resource, ApiError.NONE)
case BROKER =>
// The resource name must be either blank (if setting a cluster config) or
// the ID of this specific broker.
if (configResource.name().nonEmpty) {
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
validateBrokerConfigChange(resource, configResource)
case TOPIC | CLIENT_METRICS | GROUP =>
// Nothing to do.
case _ =>
throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}")
}
} catch {
case t: Throwable =>
validateBrokerConfigChange(resource, configResource)
},
_ => {
// Nothing to do for TOPIC, CLIENT_METRICS or GROUP type in Broker,
// and UNKNOWN type is already handled in validateUnknownConfigTypeError.
},
(configResource, t) => {
val err = ApiError.fromThrowable(t)
error(s"Error preprocessing incrementalAlterConfigs request on $configResource", t)
results.put(resource, err)
}
}
)
}
})
results
Expand Down Expand Up @@ -451,4 +436,57 @@ object ConfigAdminManager {
}
}
}

def processConfigResource(
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
onBrokerLogger: () => Unit,
onBroker: ConfigResource => Unit,
onOtherTypes: ConfigResource => Unit,
onError: (ConfigResource, Throwable) => Unit
): Unit = {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
val configResource = new ConfigResource(resourceType, resource.resourceName())
try {
if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
throw new InvalidRequestException("Error due to duplicate config keys")
}
validateNullValue(resource)
validateUnknownConfigTypeError(configResource, resource)
resourceType match {
case BROKER_LOGGER => onBrokerLogger()
case BROKER => onBroker(configResource)
case _ => onOtherTypes(configResource)
}
} catch {
case t: Throwable => onError(configResource, t)
}
}

private def validateNullValue(
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
): Unit = {
val nullUpdates = new util.ArrayList[String]()
resource.configs.forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
}
}
if (!nullUpdates.isEmpty) {
val exception = new InvalidRequestException("Null value not supported for : " + String.join(", ", nullUpdates))
log.error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", exception)
throw exception
}
}

private def validateUnknownConfigTypeError(
configResource: ConfigResource,
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
): Unit = {
if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
val exception = new InvalidRequestException(s"Unknown resource type ${resource.resourceType()}.")
log.error(s"Error on processing request on ${configResource}", exception)
throw exception
}
}
}
84 changes: 50 additions & 34 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -721,45 +721,35 @@ class ControllerApis(
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
ConfigAdminManager.processConfigResource(
resource,
() => {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
brokerLoggerResponses.add(new AlterConfigsResourceResponse()
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType())
.setErrorCode(ApiError.NONE.error().code())
.setErrorMessage(null))
},
configResource => {
addConfigChangesOrHandleDuplicate(configResource, resource, duplicateResources, configChanges, response)
},
configResource => {
addConfigChangesOrHandleDuplicate(configResource, resource, duplicateResources, configChanges, response)
},
(_, t) => {
val err = ApiError.fromThrowable(t)
error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", t)
response.responses().add(new AlterConfigsResourceResponse()
.setErrorCode(err.error().code())
.setErrorMessage(err.messageWithFallback())
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType()))
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
}
}
)
}
val iterator = configChanges.keySet().iterator()
while (iterator.hasNext) {
Expand Down Expand Up @@ -792,6 +782,32 @@ class ControllerApis(
}
}

private def addConfigChangesOrHandleDuplicate(
configResource: ConfigResource,
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
duplicateResources: util.HashSet[ConfigResource],
configChanges: util.HashMap[ConfigResource, util.Map[String, Entry[AlterConfigOp.OpType, String]]],
response: IncrementalAlterConfigsResponseData
): Unit = {
if (!duplicateResources.contains(configResource)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use configChanges instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the configChanges collection could be used to check the duplicates, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic worked in three steps:

  1. The first occurrence of a configResource is added to the map.
  2. The second occurrence of the same configResource is removed from the map, added to duplicateValue, and written to the response.
  3. The third and any subsequent occurrences of that configResource are ignored entirely.

Given this behavior, I think we cannot rely solely on a map to implement this logic.

val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}

if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse()
.setErrorCode(INVALID_REQUEST.code())
.setErrorMessage("Duplicate resource.")
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType()))
}
}
}

private def handleCreatePartitions(request: RequestChannel.Request): CompletableFuture[Unit] = {
def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
Expand Down
103 changes: 100 additions & 3 deletions core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.server

import java.util
import java.util.Collections

import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN}
Expand All @@ -39,7 +38,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.metadata.MockConfigRepository
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{Assertions, Test}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -393,7 +393,7 @@ class ConfigAdminManagerTest {
val manager = newConfigAdminManager(1)
val unknown = unknownIncremental()
assertEquals(Collections.singletonMap(unknown,
new ApiError(INVALID_REQUEST, "Unknown resource type 0")),
new ApiError(INVALID_REQUEST, "Unknown resource type 0.")),
manager.preprocess(new IncrementalAlterConfigsRequestData().
setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
unknown).iterator())),
Expand All @@ -419,4 +419,101 @@ class ConfigAdminManagerTest {
assertFalse(ConfigAdminManager.containsDuplicates(Seq("foo", "bar", "baz")))
assertTrue(ConfigAdminManager.containsDuplicates(Seq("foo", "bar", "baz", "foo")))
}

@Test
def testProcessConfigResourceWithNulls(): Unit = {
val brokerLogger2 = brokerLogger2Incremental()
new IncrementalAlterConfigsRequestData()
.setResources(new IAlterConfigsResourceCollection(
util.List.of(brokerLogger2).iterator())
).resources().forEach(resource => {
val invalidRequestException = assertThrows(classOf[InvalidRequestException],
() => ConfigAdminManager.processConfigResource(
resource,
() => {},
_ => {},
_ => {},
(_, t) => {
throw t
})
)
assertEquals(s"Null value not supported for : ${logger.getName}", invalidRequestException.getMessage)
})
}

@Test
def testProcessConfigResourceWithUnknownConfigType(): Unit = {
val config = new IAlterableConfig().setName(logger.getName)
.setValue("INFO")
.setConfigOperation(OpType.SET.id())
new IncrementalAlterConfigsRequestData()
.setResources(new IAlterConfigsResourceCollection(
util.List.of(new IAlterConfigsResource()
.setResourceName("1")
.setResourceType(UNKNOWN.id())
.setConfigs(new IAlterableConfigCollection(
util.List.of(config).iterator()))).iterator()))
.resources().forEach(resource => {
val invalidRequestException = assertThrows(classOf[InvalidRequestException],
() => ConfigAdminManager.processConfigResource(
resource,
() => {},
_ => {},
_ => {},
(_, t) => {
throw t
})
)
assertEquals(s"Unknown resource type ${UNKNOWN.id()}.", invalidRequestException.getMessage)
})
}

@Test
def testProcessConfigResourceWithDuplicates(): Unit = {
val config = new IAlterableConfig().setName(logger.getName)
.setValue("INFO")
.setConfigOperation(OpType.SET.id())
val config1 = new IAlterableConfig().setName(logger.getName)
.setValue("ERROR")
.setConfigOperation(OpType.SET.id())
new IncrementalAlterConfigsRequestData()
.setResources(new IAlterConfigsResourceCollection(
util.List.of(new IAlterConfigsResource()
.setResourceName("1")
.setResourceType(BROKER_LOGGER.id)
.setConfigs(new IAlterableConfigCollection(
util.List.of(config, config1).iterator()))).iterator()))
.resources().forEach(resource => {
val invalidRequestException = assertThrows(classOf[InvalidRequestException],
() => ConfigAdminManager.processConfigResource(
resource,
() => {},
_ => {},
_ => {},
(_, t) => {
throw t
})
)
assertEquals("Error due to duplicate config keys", invalidRequestException.getMessage)
})
}

@Test
def testProcessConfigResource(): Unit = {
val brokerLogger1 = brokerLogger1Incremental()
new IncrementalAlterConfigsRequestData()
.setResources(new IAlterConfigsResourceCollection(
util.List.of(brokerLogger1).iterator())
).resources().forEach(resource => {
val processConfigResource: Executable = () => ConfigAdminManager.processConfigResource(
resource,
() => {},
_ => {},
_ => {},
(_, t) => {
throw t
})
assertDoesNotThrow(processConfigResource)
})
}
}
Loading