Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1fcfe35
completed the feature
m1a2st Nov 27, 2025
cc33873
updated the test
m1a2st Nov 27, 2025
f5f78e7
fix test
m1a2st Nov 28, 2025
59d3be2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 2, 2025
c8d913f
temp version
m1a2st Dec 2, 2025
f8d01df
temp version
m1a2st Dec 2, 2025
a165eb5
test in the kafka CI
m1a2st Dec 3, 2025
48184c2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 3, 2025
1ff2a4c
addressed by comments
m1a2st Dec 3, 2025
da21c8b
fix the fail test
m1a2st Dec 3, 2025
fdbb81c
add new test for ConfigAdminManagerTest
m1a2st Dec 3, 2025
a0198d9
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 8, 2025
d6ae031
addressed by comments
m1a2st Dec 8, 2025
138e8e8
update the addConfigChangesOrHandleDuplicate logic
m1a2st Dec 8, 2025
72e787b
update the related logic
m1a2st Dec 8, 2025
bfd3860
revert the error logic
m1a2st Dec 8, 2025
2a4c9ea
remove unused exception throwing
m1a2st Dec 8, 2025
d388063
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 20, 2025
6df68ba
resolve conflict
m1a2st Dec 20, 2025
bb8c33a
add new validator
m1a2st Dec 20, 2025
16ca216
fix the fail test
m1a2st Dec 21, 2025
ca12d22
temp
m1a2st Dec 21, 2025
ac4b124
temp patch
m1a2st Dec 24, 2025
b07102e
revert unused change
m1a2st Dec 24, 2025
500b7ae
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 24, 2025
8d1cf15
check the resource operation type
m1a2st Dec 25, 2025
d896bfb
check null value
m1a2st Dec 25, 2025
a690de7
check delete opType
m1a2st Dec 25, 2025
e7251e2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 25, 2025
374c301
Merge branch 'trunk' into KAFKA-19931
m1a2st Jan 4, 2026
59264ff
update the logic
m1a2st Jan 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 77 additions & 39 deletions core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
* KIP-412 added support for changing log4j log levels via IncrementalAlterConfigs, but
* not via the original AlterConfigs. In retrospect, this would have been better off as a
* separate RPC, since the semantics are quite different. In particular, KIP-226 configs
* are stored durably and persist across broker restarts, but KIP-412 log4j levels do not.
* are stored durably and persist across broker restarts, but KIP-412 log4j levels do not.
* However, we have to handle it here now in order to maintain compatibility.
*
* Configuration processing is split into two parts.
Expand Down Expand Up @@ -112,48 +112,33 @@ class ConfigAdminManager(nodeId: Int,
})
request.resources().forEach(resource => {
if (!results.containsKey(resource)) {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
val configResource = new ConfigResource(resourceType, resource.resourceName())
try {
if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
throw new InvalidRequestException("Error due to duplicate config keys")
}
val nullUpdates = new util.ArrayList[String]()
resource.configs().forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
processConfigResource(
resource,
() => {
runtimeLoggerManager.applyChangesForResource(
authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
request.validateOnly(),
resource)
results.put(resource, ApiError.NONE)
},
configResource => {
// The resource name must be either blank (if setting a cluster config) or
// the ID of this specific broker.
if (configResource.name().nonEmpty) {
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
}
if (!nullUpdates.isEmpty) {
throw new InvalidRequestException("Null value not supported for : " +
String.join(", ", nullUpdates))
}
resourceType match {
case BROKER_LOGGER =>
runtimeLoggerManager.applyChangesForResource(
authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
request.validateOnly(),
resource)
results.put(resource, ApiError.NONE)
case BROKER =>
// The resource name must be either blank (if setting a cluster config) or
// the ID of this specific broker.
if (configResource.name().nonEmpty) {
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
validateBrokerConfigChange(resource, configResource)
case TOPIC | CLIENT_METRICS | GROUP =>
// Nothing to do.
case _ =>
throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}")
}
} catch {
case t: Throwable =>
validateBrokerConfigChange(resource, configResource)
},
_ => {
// Nothing to do for TOPIC | CLIENT_METRICS | GROUP type in Broker.
// and UNKNOWN type is already handled in validateUnknownConfigTypeError.
},
(configResource, t) => {
val err = ApiError.fromThrowable(t)
error(s"Error preprocessing incrementalAlterConfigs request on $configResource", t)
results.put(resource, err)
}
}
)
}
})
results
Expand Down Expand Up @@ -451,4 +436,57 @@ object ConfigAdminManager {
}
}
}

def processConfigResource(
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
onBrokerLogger: () => Unit,
onBroker: ConfigResource => Unit,
onOtherTypes: ConfigResource => Unit,
onError: (ConfigResource, Throwable) => Unit
): Unit = {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
val configResource = new ConfigResource(resourceType, resource.resourceName())
try {
if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
throw new InvalidRequestException("Error due to duplicate config keys")
}
validateNullValue(resource)
validateUnknownConfigTypeError(configResource, resource)
resourceType match {
case BROKER_LOGGER => onBrokerLogger()
case BROKER => onBroker(configResource)
case _ => onOtherTypes(configResource)
}
} catch {
case t: Throwable => onError(configResource, t)
}
}

def validateNullValue(
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
): Unit = {
val nullUpdates = new util.ArrayList[String]()
resource.configs.forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
}
}
if (!nullUpdates.isEmpty) {
val exception = new InvalidRequestException("Null value not supported for : " + String.join(", ", nullUpdates))
log.error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", exception)
throw exception
}
}

def validateUnknownConfigTypeError(
configResource: ConfigResource,
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
): Unit = {
if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
val exception = new InvalidRequestException(s"Unknown resource type ${resource.resourceType()}.")
log.error(s"Error on processing request on ${configResource}", exception)
throw exception
}
}
}
97 changes: 59 additions & 38 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -721,45 +721,40 @@ class ControllerApis(
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
ConfigAdminManager.processConfigResource(
resource,
onBrokerLogger = () => {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throwing the exception directly? The onError method should handle it, right?

}
brokerLoggerResponses.add(new AlterConfigsResourceResponse()
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType())
.setErrorCode(apiError.error().code())
.setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
},
configResource => {
addConfigChangesOrHandleDuplicate(configResource, resource, duplicateResources, configChanges, response)
},
configResource => {
addConfigChangesOrHandleDuplicate(configResource, resource, duplicateResources, configChanges, response)
},
onError = (_, t) => {
val err = ApiError.fromThrowable(t)
error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", t)
response.responses().add(new AlterConfigsResourceResponse()
.setErrorCode(err.error().code())
.setErrorMessage(err.messageWithFallback())
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType()))
}
}
)
}
val iterator = configChanges.keySet().iterator()
while (iterator.hasNext) {
Expand Down Expand Up @@ -792,6 +787,32 @@ class ControllerApis(
}
}

private def addConfigChangesOrHandleDuplicate(
configResource: ConfigResource,
resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
duplicateResources: util.HashSet[ConfigResource],
configChanges: util.HashMap[ConfigResource, util.Map[String, Entry[AlterConfigOp.OpType, String]]],
response: IncrementalAlterConfigsResponseData
): Unit = {
if (!duplicateResources.contains(configResource)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use configChanges instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the configChanges collection could be used to check the duplicates, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic worked in three steps:

  1. The first occurrence of a configResource is added to the map.
  2. The second occurrence of the same configResource is removed from the map, added to duplicateValue, and written to the response.
  3. The third and any subsequent occurrences of that configResource are ignored entirely.

Given this behavior, I think we cannot rely solely on a map to implement this logic.

val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}

if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse()
.setErrorCode(INVALID_REQUEST.code())
.setErrorMessage("Duplicate resource.")
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType()))
}
}
}

private def handleCreatePartitions(request: RequestChannel.Request): CompletableFuture[Unit] = {
def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
Expand Down
34 changes: 33 additions & 1 deletion core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ class ControllerApisTest {
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()),
new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorCode(INVALID_REQUEST.code()).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller error code should match the broker error code.

setErrorMessage("Unknown resource type 124.").
setResourceName("foo").
setResourceType(124.toByte),
Expand All @@ -679,6 +679,38 @@ class ControllerApisTest {
response.data().responses().asScala.toSet)
}

@Test
def testInvalidIncrementalAlterConfigsWithNullResources(): Unit = {
val requestData = new IncrementalAlterConfigsRequestData().setResources(
new AlterConfigsResourceCollection(util.Arrays.asList(
new AlterConfigsResource().
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("my.custom.config").
setValue(null).
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
controllerApis = createControllerApis(None, new MockController.Builder().build())
controllerApis.handleIncrementalAlterConfigs(request)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None))
assertNotNull(capturedResponse.getValue)
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
assertEquals(Set(
new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Null value not supported for : my.custom.config").
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id())),
response.data().responses().asScala.toSet)
}

@Test
def testUnauthorizedHandleAlterPartitionReassignments(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,39 @@ public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exceptio
assertEquals("2", configEntry.value());
}
}

@ClusterTest
public void testIncrementalAlterNullValueConfigsByControllers(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, true);
}

@ClusterTest
public void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, false);
}

private void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
int nodeId = usingBootstrapControllers ?
clusterInstance.controllers().values().iterator().next().config().nodeId() :
clusterInstance.brokers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
nodeResource,
List.of(
new AlterConfigOp(new ConfigEntry("my.custom.config", null), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("my.custom.config1", null), AlterConfigOp.OpType.SET)
)
);
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES)
);
assertEquals(
"org.apache.kafka.common.errors.InvalidRequestException: " +
"Null value not supported for : my.custom.config, my.custom.config1",
exception.getMessage()
);
}
}
}
Loading