diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 825d2e67f70d7..2c8ea3aeef3c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -35,6 +35,14 @@ */ public interface MetadataUpdater extends Closeable { + /** + * Gets the current cluster id without blocking. + * @return the cluster id, or null if unknown + */ + default String clusterId() { + return null; + } + /** * Gets the current cluster info without blocking. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 692847a8b1553..b396c5ccbb772 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -1107,6 +1108,13 @@ private void handleInitiateApiVersionRequests(long now) { // not before ready. this.connectionStates.checkingApiVersions(node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); + String clusterId = this.metadataUpdater.clusterId(); + int nodeId = Integer.parseInt(node); + if (clusterId != null && nodeId < Integer.MAX_VALUE / 2) { + System.out.println("CLUSTER_ID(" + clusterId + "), NODE_ID(" + nodeId + ")"); + apiVersionRequestBuilder.setClusterId(clusterId); + apiVersionRequestBuilder.setNodeId(Integer.parseInt(node)); + } ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); @@ -1193,6 +1201,15 @@ class DefaultMetadataUpdater implements MetadataUpdater { this.inProgress = null; } + @Override + public String clusterId() { + ClusterResource clusterResource = metadata.fetch().clusterResource(); + if (clusterResource != null) { + return clusterResource.clusterId(); + } + return null; + } + @Override public List fetchNodes() { return metadata.fetch().nodes(); @@ -1296,6 +1313,7 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == Errors.REBOOTSTRAP_REQUIRED) { log.info("Rebootstrap requested by server."); + log.error("REBOOTSTRAP REQUESTED BY SERVER."); initiateRebootstrap(); } else if (response.brokers().isEmpty()) { // When talking to the startup phase of a broker, it is possible to receive an empty metadata set, which diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 1bdb0903c7d7d..f89e94516117f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -56,6 +56,14 @@ public Builder( this.data = data.duplicate(); } + public void setClusterId(String clusterId) { + this.data.setClusterId(clusterId); + } + + public void setNodeId(int nodeId) { + this.data.setNodeId(nodeId); + } + @Override public ApiVersionsRequest build(short version) { return new ApiVersionsRequest(data, version); @@ -94,6 +102,12 @@ public boolean hasUnsupportedRequestVersion() { } public boolean isValid() { + if (version() >= 5) { + // Either cluster ID and node ID are both specified, or neither is. + if ((data.clusterId() == null && data.nodeId() != -1) || (data.clusterId() != null && data.nodeId() == -1)) { + return false; + } + } if (version() >= 3) { return SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches() && SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches(); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 144696be4341c..92e7f4cef3207 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1529,10 +1529,22 @@ class KafkaApis(val requestChannel: RequestChannel, apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception) } else if (!apiVersionRequest.isValid) { apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception) + } else if (clusterIdOrNodeIdIsInvalid(apiVersionRequest)) { + apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.REBOOTSTRAP_REQUIRED.exception) } else { apiVersionManager.apiVersionResponse(requestThrottleMs, request.header.apiVersion() < 4) } } + + // KIP-1242 checks the cluster ID and node ID in the request if provided to ensure the + // client is connecting to the correct broker. If both are specified, they must match + // the expected values for this broker. + def clusterIdOrNodeIdIsInvalid(apiVersionRequest: ApiVersionsRequest): Boolean = { + apiVersionRequest.version >= 5 && + apiVersionRequest.data.clusterId != null && + (!apiVersionRequest.data.clusterId.equals(clusterId) || apiVersionRequest.data.nodeId != brokerId) + } + requestHelper.sendResponseMaybeThrottle(request, createResponseCallback) }