@@ -26,16 +26,13 @@ import kafka.server.share.SharePartitionManager
2626import kafka .utils .Logging
2727import org .apache .kafka .admin .AdminUtils
2828import org .apache .kafka .clients .CommonClientConfigs
29- import org .apache .kafka .clients .admin .AlterConfigOp .OpType
30- import org .apache .kafka .clients .admin .{AlterConfigOp , ConfigEntry , EndpointType }
29+ import org .apache .kafka .clients .admin .EndpointType
3130import org .apache .kafka .common .acl .AclOperation
3231import org .apache .kafka .common .acl .AclOperation ._
33- import org .apache .kafka .common .config .ConfigResource
3432import org .apache .kafka .common .errors ._
3533import org .apache .kafka .common .internals .Topic .{GROUP_METADATA_TOPIC_NAME , SHARE_GROUP_STATE_TOPIC_NAME , TRANSACTION_STATE_TOPIC_NAME , isInternal }
3634import org .apache .kafka .common .internals .{FatalExitError , Topic }
3735import org .apache .kafka .common .message .AddPartitionsToTxnResponseData .{AddPartitionsToTxnResult , AddPartitionsToTxnResultCollection }
38- import org .apache .kafka .common .message .AlterConfigsResponseData .AlterConfigsResourceResponse
3936import org .apache .kafka .common .message .DeleteRecordsResponseData .{DeleteRecordsPartitionResult , DeleteRecordsTopicResult }
4037import org .apache .kafka .common .message .ListClientMetricsResourcesResponseData .ClientMetricsResource
4138import org .apache .kafka .common .message .ListOffsetsRequestData .ListOffsetsPartition
@@ -2120,55 +2117,11 @@ class KafkaApis(val requestChannel: RequestChannel,
21202117 }
21212118 if (remaining.resources().isEmpty) {
21222119 sendResponse(Some (new AlterConfigsResponseData ()))
2123- } else if (( ! request.isForwarded) && metadataSupport.canForward()) {
2120+ } else {
21242121 metadataSupport.forwardingManager.get.forwardRequest(request,
21252122 new AlterConfigsRequest (remaining, request.header.apiVersion()),
21262123 response => sendResponse(response.map(_.data())))
2127- } else {
2128- sendResponse(Some (processLegacyAlterConfigsRequest(request, remaining)))
2129- }
2130- }
2131-
2132- def processLegacyAlterConfigsRequest (
2133- originalRequest : RequestChannel .Request ,
2134- data : AlterConfigsRequestData
2135- ): AlterConfigsResponseData = {
2136- val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis .shouldAlwaysForward(originalRequest))
2137- val alterConfigsRequest = new AlterConfigsRequest (data, originalRequest.header.apiVersion())
2138- val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
2139- resource.`type` match {
2140- case ConfigResource .Type .BROKER_LOGGER =>
2141- throw new InvalidRequestException (s " AlterConfigs is deprecated and does not support the resource type ${ConfigResource .Type .BROKER_LOGGER }" )
2142- case ConfigResource .Type .BROKER | ConfigResource .Type .CLIENT_METRICS =>
2143- authHelper.authorize(originalRequest.context, ALTER_CONFIGS , CLUSTER , CLUSTER_NAME )
2144- case ConfigResource .Type .TOPIC =>
2145- authHelper.authorize(originalRequest.context, ALTER_CONFIGS , TOPIC , resource.name)
2146- case rt => throw new InvalidRequestException (s " Unexpected resource type $rt" )
2147- }
2148- }
2149- val authorizedResult = zkSupport.adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
2150- val unauthorizedResult = unauthorizedResources.keys.map { resource =>
2151- resource -> configsAuthorizationApiError(resource)
2152- }
2153- val response = new AlterConfigsResponseData ()
2154- (authorizedResult ++ unauthorizedResult).foreach { case (resource, error) =>
2155- response.responses().add(new AlterConfigsResourceResponse ()
2156- .setErrorCode(error.error.code)
2157- .setErrorMessage(error.message)
2158- .setResourceName(resource.name)
2159- .setResourceType(resource.`type`.id))
2160- }
2161- response
2162- }
2163-
2164- private def configsAuthorizationApiError (resource : ConfigResource ): ApiError = {
2165- val error = resource.`type` match {
2166- case ConfigResource .Type .BROKER | ConfigResource .Type .BROKER_LOGGER => Errors .CLUSTER_AUTHORIZATION_FAILED
2167- case ConfigResource .Type .TOPIC => Errors .TOPIC_AUTHORIZATION_FAILED
2168- case ConfigResource .Type .GROUP => Errors .GROUP_AUTHORIZATION_FAILED
2169- case rt => throw new InvalidRequestException (s " Unexpected resource type $rt for resource ${resource.name}" )
21702124 }
2171- new ApiError (error, null )
21722125 }
21732126
21742127 def handleIncrementalAlterConfigsRequest (request : RequestChannel .Request ): Unit = {
@@ -2177,15 +2130,6 @@ class KafkaApis(val requestChannel: RequestChannel,
21772130 (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS , rType, rName))
21782131 val remaining = ConfigAdminManager .copyWithoutPreprocessed(original.data(), preprocessingResponses)
21792132
2180- // Before deciding whether to forward or handle locally, a ZK broker needs to check if
2181- // the active controller is ZK or KRaft. If the controller is KRaft, we need to forward.
2182- // If the controller is ZK, we need to process the request locally.
2183- val isKRaftController = metadataSupport match {
2184- case ZkSupport (_, _, _, _, metadataCache, _) =>
2185- metadataCache.getControllerId.exists(_.isInstanceOf [KRaftCachedControllerId ])
2186- case RaftSupport (_, _) => true
2187- }
2188-
21892133 def sendResponse (secondPart : Option [ApiMessage ]): Unit = {
21902134 secondPart match {
21912135 case Some (result : IncrementalAlterConfigsResponseData ) =>
@@ -2198,49 +2142,13 @@ class KafkaApis(val requestChannel: RequestChannel,
21982142 }
21992143 }
22002144
2201- // Forwarding has not happened yet, so handle both ZK and KRaft cases here
22022145 if (remaining.resources().isEmpty) {
22032146 sendResponse(Some (new IncrementalAlterConfigsResponseData ()))
2204- } else if (( ! request.isForwarded) && metadataSupport.canForward() && isKRaftController) {
2147+ } else {
22052148 metadataSupport.forwardingManager.get.forwardRequest(request,
22062149 new IncrementalAlterConfigsRequest (remaining, request.header.apiVersion()),
22072150 response => sendResponse(response.map(_.data())))
2208- } else {
2209- sendResponse(Some (processIncrementalAlterConfigsRequest(request, remaining)))
2210- }
2211- }
2212-
2213- def processIncrementalAlterConfigsRequest (
2214- originalRequest : RequestChannel .Request ,
2215- data : IncrementalAlterConfigsRequestData
2216- ): IncrementalAlterConfigsResponseData = {
2217- val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis .shouldAlwaysForward(originalRequest))
2218- val configs = data.resources.iterator.asScala.map { alterConfigResource =>
2219- val configResource = new ConfigResource (ConfigResource .Type .forId(alterConfigResource.resourceType),
2220- alterConfigResource.resourceName)
2221- configResource -> alterConfigResource.configs.iterator.asScala.map {
2222- alterConfig => new AlterConfigOp (new ConfigEntry (alterConfig.name, alterConfig.value),
2223- OpType .forId(alterConfig.configOperation))
2224- }.toBuffer
2225- }.toMap
2226-
2227- val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
2228- resource.`type` match {
2229- case ConfigResource .Type .BROKER | ConfigResource .Type .BROKER_LOGGER | ConfigResource .Type .CLIENT_METRICS =>
2230- authHelper.authorize(originalRequest.context, ALTER_CONFIGS , CLUSTER , CLUSTER_NAME )
2231- case ConfigResource .Type .TOPIC =>
2232- authHelper.authorize(originalRequest.context, ALTER_CONFIGS , TOPIC , resource.name)
2233- case ConfigResource .Type .GROUP =>
2234- authHelper.authorize(originalRequest.context, ALTER_CONFIGS , GROUP , resource.name)
2235- case rt => throw new InvalidRequestException (s " Unexpected resource type $rt" )
2236- }
2237- }
2238-
2239- val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, data.validateOnly)
2240- val unauthorizedResult = unauthorizedResources.keys.map { resource =>
2241- resource -> configsAuthorizationApiError(resource)
22422151 }
2243- new IncrementalAlterConfigsResponse (0 , (authorizedResult ++ unauthorizedResult).asJava).data()
22442152 }
22452153
22462154 def handleDescribeConfigsRequest (request : RequestChannel .Request ): Unit = {
0 commit comments