Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1fcfe35
completed the feature
m1a2st Nov 27, 2025
cc33873
updated the test
m1a2st Nov 27, 2025
f5f78e7
fix test
m1a2st Nov 28, 2025
59d3be2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 2, 2025
c8d913f
temp version
m1a2st Dec 2, 2025
f8d01df
temp version
m1a2st Dec 2, 2025
a165eb5
test in the kafka CI
m1a2st Dec 3, 2025
48184c2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 3, 2025
1ff2a4c
addressed by comments
m1a2st Dec 3, 2025
da21c8b
fix the fail test
m1a2st Dec 3, 2025
fdbb81c
add new test for ConfigAdminManagerTest
m1a2st Dec 3, 2025
a0198d9
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 8, 2025
d6ae031
addressed by comments
m1a2st Dec 8, 2025
138e8e8
update the addConfigChangesOrHandleDuplicate logic
m1a2st Dec 8, 2025
72e787b
update the related logic
m1a2st Dec 8, 2025
bfd3860
revert the error logic
m1a2st Dec 8, 2025
2a4c9ea
remove unused exception throwing
m1a2st Dec 8, 2025
d388063
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 20, 2025
6df68ba
resolve conflict
m1a2st Dec 20, 2025
bb8c33a
add new validator
m1a2st Dec 20, 2025
16ca216
fix the fail test
m1a2st Dec 21, 2025
ca12d22
temp
m1a2st Dec 21, 2025
ac4b124
temp patch
m1a2st Dec 24, 2025
b07102e
revert unused change
m1a2st Dec 24, 2025
500b7ae
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 24, 2025
8d1cf15
check the resource operation type
m1a2st Dec 25, 2025
d896bfb
check null value
m1a2st Dec 25, 2025
a690de7
check delete opType
m1a2st Dec 25, 2025
e7251e2
Merge branch 'trunk' into KAFKA-19931
m1a2st Dec 25, 2025
374c301
Merge branch 'trunk' into KAFKA-19931
m1a2st Jan 4, 2026
59264ff
update the logic
m1a2st Jan 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, USER}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
Expand Down Expand Up @@ -720,47 +721,76 @@ class ControllerApis(
val configChanges = new util.HashMap[ConfigResource,
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
val nullConfigsErrorResults = new util.IdentityHashMap[AlterConfigsResource, ApiError]()
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
try {
val nullUpdates = new util.ArrayList[String]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there are quite a few pre-processing checks in ConfigAdminManager. The doc says the following. Could we restructure the code between ConfigAdminManager and ControllerApis to (1) avoid duplicates logic in verification (2) prevent missing verification in one of the two places in the future?

        // BROKER_LOGGER requests always go to a specific, constant broker or controller node.
        //
        // BROKER resource changes for a specific (non-default) resource go to either that specific
        // node (if using bootstrap.servers), or directly to the active controller (if using
        // bootstrap.controllers)
        //
        // All other requests go to the least loaded broker (if using bootstrap.servers) or the
        // active controller (if using bootstrap.controllers)

resource.configs().forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
}
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
if (!nullUpdates.isEmpty) {
throw new InvalidRequestException("Null value not supported for : " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to just set the error response here instead of throwing?

String.join(", ", nullUpdates))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throwing the exception directly? The onError method should handle it, right?

}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
}
}
} catch {
case t: Throwable =>
val err = ApiError.fromThrowable(t)
error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", t)
nullConfigsErrorResults.put(resource, err)
}
}
if (!nullConfigsErrorResults.isEmpty) {
nullConfigsErrorResults.forEach((resource, apiError) => {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.messageWithFallback()).
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
configChanges.remove(resource)
})
}
val iterator = configChanges.keySet().iterator()
while (iterator.hasNext) {
val resource = iterator.next()
Expand Down
39 changes: 39 additions & 0 deletions core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,13 @@ class ControllerApisTest {
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("consumer.session.timeout.ms").
setValue("50000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
setResourceName("null-value-resource").
setResourceType(ConfigResource.Type.TOPIC.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("my.custom.config").
setValue(null).
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
Expand Down Expand Up @@ -679,6 +686,38 @@ class ControllerApisTest {
response.data().responses().asScala.toSet)
}

@Test
def testInvalidIncrementalAlterConfigsWithNullResources(): Unit = {
val requestData = new IncrementalAlterConfigsRequestData().setResources(
new AlterConfigsResourceCollection(util.Arrays.asList(
new AlterConfigsResource().
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("my.custom.config").
setValue(null).
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
controllerApis = createControllerApis(None, new MockController.Builder().build())
controllerApis.handleIncrementalAlterConfigs(request)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None))
assertNotNull(capturedResponse.getValue)
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
assertEquals(Set(
new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Null value not supported for : my.custom.config").
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id())),
response.data().responses().asScala.toSet)
}

@Test
def testUnauthorizedHandleAlterPartitionReassignments(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,39 @@ public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exceptio
assertEquals("2", configEntry.value());
}
}

@ClusterTest
public void testIncrementalAlterNullValueConfigsByControllers(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, true);
}

@ClusterTest
public void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, false);
}

private void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
int nodeId = usingBootstrapControllers ?
clusterInstance.controllers().values().iterator().next().config().nodeId() :
clusterInstance.brokers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
nodeResource,
List.of(
new AlterConfigOp(new ConfigEntry("my.custom.config", null), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("my.custom.config1", null), AlterConfigOp.OpType.SET)
)
);
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES)
);
assertEquals(
"org.apache.kafka.common.errors.InvalidRequestException: " +
"Null value not supported for : my.custom.config, my.custom.config1",
exception.getMessage()
);
}
}
}
Loading