From c97b88d5db4de28d9f51bb11fb71ddd6217c7dda Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 5 Jun 2023 11:03:48 +0200 Subject: [PATCH 1/8] Bump version to 3.5.0 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- tests/kafkatest/version.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gradle.properties b/gradle.properties index f6093c3b63a42..9ceb57ad1d527 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=3.5.0-SNAPSHOT +version=3.5.0 scalaVersion=2.13.10 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 104d4e6c929ce..6cc7508d31904 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 3.5.0-SNAPSHOT + 3.5.0 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 64b06a40287d9..d45ce8a2df0b2 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 3.5.0-SNAPSHOT + 3.5.0 1.7.36 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 05435be22be76..7e64cf5c75f93 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 3.5.0-SNAPSHOT + 3.5.0 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 74ec83d9bf7c9..4a121a90f6ad6 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '3.5.0.dev0' +__version__ = '3.5.0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 967674b551c5e..567d69dc0ccc1 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -120,7 +120,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.6.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT") LATEST_METADATA_VERSION = "3.3" From 484a86feb562f645bdbec74b18f8a28395a686f7 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 13 Jun 2023 11:29:34 +0200 Subject: [PATCH 2/8] MINOR: Bump version to 3.5.1-SNAPSHOT --- docs/js/templateData.js | 2 +- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- tests/kafkatest/version.py | 5 +++-- 8 files changed, 10 insertions(+), 9 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index e23d6ba56d32b..946be6e381c6f 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "35", "dotVersion": "3.5", - "fullDotVersion": "3.5.0", + "fullDotVersion": "3.5.1", "scalaVersion": "2.13" }; diff --git a/gradle.properties b/gradle.properties index 9ceb57ad1d527..faa7a0e067dd3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=3.5.0 +version=3.5.1-SNAPSHOT scalaVersion=2.13.10 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index 7c4f662037386..04dba2cd462c9 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "3.5.0") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "3.5.1") ORIGINAL_HEAD = "" diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 6cc7508d31904..b7eb06f9bfa43 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 3.5.0 + 3.5.1-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index d45ce8a2df0b2..d75f0e71239f7 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 3.5.0 + 3.5.1-SNAPSHOT 1.7.36 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 7e64cf5c75f93..7a097fe68b2b5 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 3.5.0 + 3.5.1-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 4a121a90f6ad6..84137f8197b4a 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '3.5.0' +__version__ = '3.5.1.dev0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 567d69dc0ccc1..060c963e20799 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -120,7 +120,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.5.1-SNAPSHOT") LATEST_METADATA_VERSION = "3.3" @@ -249,4 +249,5 @@ def get_version(node=None): # 3.5.x versions V_3_5_0 = KafkaVersion("3.5.0") -LATEST_3_5 = V_3_5_0 +V_3_5_1 = KafkaVersion("3.5.1") +LATEST_3_5 = V_3_5_1 From 7b4d9944b78baab744e1f063fca8af019ecb0983 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 13 Jun 2023 15:18:54 +0200 Subject: [PATCH 3/8] KAFKA-15080; Fetcher's lag never set when partition is idle (#13843) The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because `validBytes > 0` is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled. Reviewers: Divij Vaidya , Jason Gustafson --- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../server/ReplicaFetcherThreadTest.scala | 105 +++++++++++++++++- 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a1c5d0ed2fae1..7c5fa6960559a 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -363,13 +363,13 @@ abstract class AbstractFetcherThread(name: String, fetcherLagStats.getAndMaybePut(topicPartition).lag = lag // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data - if (validBytes > 0 && partitionStates.contains(topicPartition)) { + if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching, logAppendInfo.lastLeaderEpoch.asScala) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) - fetcherStats.byteRate.mark(validBytes) + if (validBytes > 0) fetcherStats.byteRate.mark(validBytes) } } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index bc3ea086f0c83..e58532622e3bd 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -30,13 +30,13 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats, SimpleRecord} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} import org.apache.kafka.common.utils.{LogContext, SystemTime} -import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} +import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.storage.internals.log.LogAppendInfo +import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -47,8 +47,7 @@ import org.mockito.Mockito.{mock, never, times, verify, when} import java.nio.charset.StandardCharsets import java.util -import java.util.{Collections, Optional} - +import java.util.{Collections, Optional, OptionalInt} import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ @@ -742,6 +741,102 @@ class ReplicaFetcherThreadTest { verify(log, times(0)).maybeUpdateHighWatermark(anyLong()) } + @Test + def testLagIsUpdatedWhenNoRecords(): Unit = { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager]) + val logManager: LogManager = mock(classOf[LogManager]) + val log: UnifiedLog = mock(classOf[UnifiedLog]) + val partition: Partition = mock(classOf[Partition]) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + + when(log.highWatermark).thenReturn(0) + when(log.latestEpoch).thenReturn(Some(0)) + when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0))) + when(log.logEndOffset).thenReturn(0) + when(log.maybeUpdateHighWatermark(0)).thenReturn(None) + + when(replicaManager.metadataCache).thenReturn(metadataCache) + when(replicaManager.logManager).thenReturn(logManager) + when(replicaManager.localLogOrException(t1p0)).thenReturn(log) + when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) + when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats])) + + when(partition.localLogOrException).thenReturn(log) + when(partition.appendRecordsToFollowerOrFutureReplica(any(), any())).thenReturn(Some(new LogAppendInfo( + Optional.empty[LogOffsetMetadata], + 0, + OptionalInt.empty, + RecordBatch.NO_TIMESTAMP, + -1L, + RecordBatch.NO_TIMESTAMP, + -1L, + RecordConversionStats.EMPTY, + CompressionType.NONE, + CompressionType.NONE, + -1, + 0, // No records. + false, + -1L + ))) + + val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ") + + val mockNetwork = new MockBlockingSender( + Collections.emptyMap(), + brokerEndPoint, + new SystemTime() + ) + + val leader = new RemoteLeaderEndPoint( + logContext.logPrefix, + mockNetwork, + new FetchSessionHandler(logContext, brokerEndPoint.id), + config, + replicaManager, + quota, + () => config.interBrokerProtocolVersion, + () => 1 + ) + + val thread = new ReplicaFetcherThread( + "fetcher-thread", + leader, + config, + failedPartitions, + replicaManager, + quota, + logContext.logPrefix, + () => config.interBrokerProtocolVersion + ) + + thread.addPartitions(Map( + t1p0 -> initialFetchState(Some(topicId1), 0)) + ) + + // Lag is initialized to None when the partition fetch + // state is created. + assertEquals(None, thread.fetchState(t1p0).flatMap(_.lag)) + + // Prepare the fetch response data. + mockNetwork.setFetchPartitionDataForNextResponse(Map( + t1p0 -> new FetchResponseData.PartitionData() + .setPartitionIndex(t1p0.partition) + .setLastStableOffset(0) + .setLogStartOffset(0) + .setHighWatermark(0) + .setRecords(MemoryRecords.EMPTY) // No records. + )) + mockNetwork.setIdsForNextResponse(topicIds) + + // Sends the fetch request and processes the response. + thread.doWork() + assertEquals(1, mockNetwork.fetchCount) + + // Lag is set to Some(0). + assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag)) + } + @Test def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = { From a143e7c6db4dd93d3c362a3c9fca5c255eedea0b Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Wed, 19 Apr 2023 00:46:03 +0530 Subject: [PATCH 4/8] MINOR: Fixing gradle build during compileScala and compileTestScala (#13588) Reviewers: Chia-Ping Tsai --- core/src/main/java/kafka/log/remote/RemoteLogManager.java | 2 +- core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index a2b7cd0b88d51..1048cd5020fc9 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -454,7 +454,7 @@ private void maybeUpdateReadOffset() throws RemoteStorageException { // This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader - // epoch cache then it starts copying the segments from the earliest epoch entry’s offset. + // epoch cache then it starts copying the segments from the earliest epoch entry's offset. copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition)); } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index e56fbd45a08f2..c4cfc00b57b9c 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -202,7 +202,7 @@ void testStartup() { } // This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150. - // The leader epochs are [0->0, 1->100, 2->200]. We are verifying: + // The leader epochs are [0->0, 1->100, 2->200]. We are verifying: // 1. There's only 1 segment copied to remote storage // 2. The segment got copied to remote storage is the old segment, not the active one // 3. The log segment metadata stored into remoteLogMetadataManager is what we expected, both before and after copying the log segments From 60cbfba98e521b2bbcb09f02e77bcb877225bc53 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 16 Jun 2023 11:03:57 -0400 Subject: [PATCH 5/8] MINOR: Add ZK migration instructions to the operations documentation (#13257) Reviewers: Mickael Maison --- docs/ops.html | 206 ++++++++++++++++++++++++++++++++++++++++++++++++-- docs/toc.html | 1 + 2 files changed, 201 insertions(+), 6 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index fae7a035d1189..c93ae9aa58d12 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3555,22 +3555,216 @@
Deploying Considerations
    -
  • Kafka server's process.role should be set to either broker or controller but not both. Combined mode can be used in development enviroment but it should be avoided in critical deployment evironments.
  • -
  • For redundancy, a Kafka cluster should use 3 controllers. More than 3 servers is not recommended in critical environments. In the rare case of a partial network failure it is possible for the cluster metadata quorum to become unavailable. This limitation will be addresses in a future release of Kafka.
  • -
  • The Kafka controllers store all of the metadata for the cluster in memory and on disk. We believe that for a typical Kafka cluster 5GB of main memory and 5GB of disk space on the metadata log director is sufficient.
  • +
  • Kafka server's process.role should be set to either broker or controller but not both. Combined mode can be used in development environments, but it should be avoided in critical deployment environments.
  • +
  • For redundancy, a Kafka cluster should use 3 controllers. More than 3 controllers is not recommended in critical environments. In the rare case of a partial network failure it is possible for the cluster metadata quorum to become unavailable. This limitation will be addressed in a future release of Kafka.
  • +
  • The Kafka controllers store all the metadata for the cluster in memory and on disk. We believe that for a typical Kafka cluster 5GB of main memory and 5GB of disk space on the metadata log director is sufficient.
  • +

Missing Features

-

The following features are not fullying implemented in KRaft mode:

+

The following features are not fully implemented in KRaft mode:

    -
  • Configuring SCRAM users via the administrative API
  • Supporting JBOD configurations with multiple storage directories
  • Modifying certain dynamic configurations on the standalone KRaft controller
  • Delegation tokens
  • -
  • Upgrade from ZooKeeper mode
+

ZooKeeper to KRaft Migration

+ +

+ ZooKeeper to KRaft migration is considered an Early Access feature and is not recommended for production clusters. +

+ +

The following features are not yet supported for ZK to KRaft migrations:

+ + + +

+ Please report issues with ZooKeeper to KRaft migration using the + project JIRA and the "kraft" component. +

+ +

Terminology

+

+ We use the term "migration" here to refer to the process of changing a Kafka cluster's metadata + system from ZooKeeper to KRaft and migrating existing metadata. An "upgrade" refers to installing a newer version of Kafka. It is not recommended to + upgrade the software at the same time as performing a metadata migration. +

+ +

+ We also use the term "ZK mode" to refer to Kafka brokers which are using ZooKeeper as their metadata + system. "KRaft mode" refers Kafka brokers which are using a KRaft controller quorum as their metadata system. +

+ +

Preparing for migration

+

+ Before beginning the migration, the Kafka brokers must be upgraded to software version 3.5.0 and have the + "inter.broker.protocol.version" configuration set to "3.5". See Upgrading to 3.5.0 for + upgrade instructions. +

+ +

+ It is recommended to enable TRACE level logging for the migration components while the migration is active. This can + be done by adding the following log4j configuration to each KRaft controller's "log4j.properties" file. +

+ +
log4j.logger.org.apache.kafka.metadata.migration=TRACE
+ +

+ It is generally useful to enable DEBUG logging on the KRaft controllers and the ZK brokers during the migration. +

+ +

Provisioning the KRaft controller quorum

+

+ Two things are needed before the migration can begin. First, the brokers must be configured to support the migration and second, + a KRaft controller quorum must be deployed. The KRaft controllers should be provisioned with the same cluster ID as + the existing Kafka cluster. This can be found by examining one of the "meta.properties" files in the data directories + of the brokers, or by running the following command. +

+ +
./bin/zookeeper-shell.sh localhost:2181 get /cluster/id
+ +

+ The KRaft controller quorum should also be provisioned with the latest metadata.version of "3.4". + For further instructions on KRaft deployment, please refer to the above documentation. +

+ +

+ In addition to the standard KRaft configuration, the KRaft controllers will need to enable support for the migration + as well as provide ZooKeeper connection configuration. +

+ +

+ Here is a sample config for a KRaft controller that is ready for migration: +

+
+# Sample KRaft cluster controller.properties listening on 9093
+process.roles=controller
+node.id=3000
+controller.quorum.voters=3000@localhost:9093
+controller.listener.names=CONTROLLER
+listeners=CONTROLLER://:9093
+
+# Enable the migration
+zookeeper.metadata.migration.enable=true
+
+# ZooKeeper client configuration
+zookeeper.connect=localhost:2181
+
+# Other configs ...
+ +

Note: The KRaft cluster node.id values must be different from any existing ZK broker broker.id. + In KRaft-mode, the brokers and controllers share the same Node ID namespace.

+ +

Enabling the migration on the brokers

+

+ Once the KRaft controller quorum has been started, the brokers will need to be reconfigured and restarted. Brokers + may be restarted in a rolling fashion to avoid impacting cluster availability. Each broker requires the + following configuration to communicate with the KRaft controllers and to enable the migration. +

+ + + +

Here is a sample config for a broker that is ready for migration:

+ +
+# Sample ZK broker server.properties listening on 9092
+broker.id=0
+listeners=PLAINTEXT://:9092
+advertised.listeners=PLAINTEXT://localhost:9092
+listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
+
+# Set the IBP
+inter.broker.protocol.version=3.5
+
+# Enable the migration
+zookeeper.metadata.migration.enable=true
+
+# ZooKeeper client configuration
+zookeeper.connect=localhost:2181
+
+# KRaft controller quorum configuration
+controller.quorum.voters=3000@localhost:9093
+controller.listener.names=CONTROLLER
+ +

+ Note: Once the final ZK broker has been restarted with the necessary configuration, the migration will automatically begin. + When the migration is complete, an INFO level log can be observed on the active controller: +

+ +
Completed migration of metadata from Zookeeper to KRaft
+ +

Migrating brokers to KRaft

+

+ Once the KRaft controller completes the metadata migration, the brokers will still be running in ZK mode. While the + KRaft controller is in migration mode, it will continue sending controller RPCs to the ZK mode brokers. This includes + RPCs like UpdateMetadata and LeaderAndIsr. +

+ +

+ To migrate the brokers to KRaft, they simply need to be reconfigured as KRaft brokers and restarted. Using the above + broker configuration as an example, we would replace the broker.id with node.id and add + process.roles=broker. It is important that the broker maintain the same Broker/Node ID when it is restarted. + The zookeeper configurations should be removed at this point. +

+ +
+# Sample KRaft broker server.properties listening on 9092
+process.roles=broker
+node.id=0
+listeners=PLAINTEXT://:9092
+advertised.listeners=PLAINTEXT://localhost:9092
+listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
+
+# Don't set the IBP, KRaft uses "metadata.version" feature flag
+# inter.broker.protocol.version=3.5
+
+# Remove the migration enabled flag
+# zookeeper.metadata.migration.enable=true
+
+# Remove ZooKeeper client configuration
+# zookeeper.connect=localhost:2181
+
+# Keep the KRaft controller quorum configuration
+controller.quorum.voters=3000@localhost:9093
+controller.listener.names=CONTROLLER
+ +

+ Each broker is restarted with a KRaft configuration until the entire cluster is running in KRaft mode. +

+ +

Finalizing the migration

+

+ Once all brokers have been restarted in KRaft mode, the last step to finalize the migration is to take the + KRaft controllers out of migration mode. This is done by removing the "zookeeper.metadata.migration.enable" + property from each of their configs and restarting them one at a time. +

+ +
+# Sample KRaft cluster controller.properties listening on 9093
+process.roles=controller
+node.id=3000
+controller.quorum.voters=3000@localhost:9093
+controller.listener.names=CONTROLLER
+listeners=CONTROLLER://:9093
+
+# Disable the migration
+# zookeeper.metadata.migration.enable=true
+
+# Remove ZooKeeper client configuration
+# zookeeper.connect=localhost:2181
+
+# Other configs ...
+
diff --git a/docs/toc.html b/docs/toc.html index 468d4edbdbf33..003bced7e217a 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -162,6 +162,7 @@
  • Debugging
  • Deploying Considerations
  • Missing Features
  • +
  • ZooKeeper to KRaft Migration
  • From 23f1f2b7c4e2ad14ac1cca6780c76a2d9e95b8f0 Mon Sep 17 00:00:00 2001 From: Manyanda Chitimbo Date: Mon, 19 Jun 2023 10:35:49 +0200 Subject: [PATCH 6/8] KAFKA-15096: Update snappy-java to 1.1.10.1 (#13865) The release notes are available at https://github.com/xerial/snappy-java/releases/tag/v1.1.10.1 Reviewers: Divij Vaidya , Josep Prat --- LICENSE-binary | 2 +- gradle/dependencies.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 2c89fddc2694c..ffddc94551bf7 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -252,7 +252,7 @@ scala-library-2.13.10 scala-logging_2.13-3.9.4 scala-reflect-2.13.10 scala-java8-compat_2.13-1.0.2 -snappy-java-1.1.10.0 +snappy-java-1.1.10.1 swagger-annotations-2.2.8 zookeeper-3.6.4 zookeeper-jute-3.6.4 diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 98154c9230c00..6354d663de840 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -122,7 +122,7 @@ versions += [ scalaJava8Compat : "1.0.2", scoverage: "1.9.3", slf4j: "1.7.36", - snappy: "1.1.10.0", + snappy: "1.1.10.1", spotbugs: "4.7.3", swaggerAnnotations: "2.2.8", swaggerJaxrs2: "2.2.8", From fd46b8193719a6d32909417a7a60f30dc669c1d2 Mon Sep 17 00:00:00 2001 From: "minjian.cai" Date: Wed, 21 Jun 2023 04:16:16 +0800 Subject: [PATCH 7/8] MINOR: Fix typos for connect (#13885) Reviewers: Chris Egerton --- .../apache/kafka/connect/mirror/Checkpoint.java | 2 +- .../connect/mirror/MirrorMakerConfigTest.java | 4 ++-- .../connect/runtime/TopicCreationConfig.java | 4 ++-- .../runtime/WorkerTransactionContext.java | 8 ++++---- .../ErrorHandlingIntegrationTest.java | 4 ++-- .../connect/runtime/ErrorHandlingTaskTest.java | 2 +- .../runtime/isolation/PluginDescTest.java | 16 ++++++++-------- .../runtime/isolation/PluginUtilsTest.java | 2 +- .../runtime/rest/ConnectRestServerTest.java | 2 +- .../storage/KafkaConfigBackingStoreTest.java | 6 +++--- .../util/clusters/EmbeddedKafkaCluster.java | 2 +- 11 files changed, 26 insertions(+), 26 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java index 0d746f5834db9..8f186400dd29d 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java @@ -94,7 +94,7 @@ public OffsetAndMetadata offsetAndMetadata() { @Override public String toString() { return String.format("Checkpoint{consumerGroupId=%s, topicPartition=%s, " - + "upstreamOffset=%d, downstreamOffset=%d, metatadata=%s}", + + "upstreamOffset=%d, downstreamOffset=%d, metadata=%s}", consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java index 085b56f6f7632..2611c67ad2f01 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java @@ -105,7 +105,7 @@ public void testClientConfigProperties() { assertEquals("b__topic1", aClientConfig.replicationPolicy().formatRemoteTopic("b", "topic1"), "replication.policy.separator is honored"); assertEquals(clusterABootstrap, aClientConfig.adminConfig().get("bootstrap.servers"), - "client configs include boostrap.servers"); + "client configs include bootstrap.servers"); assertEquals(ForwardingAdmin.class.getName(), aClientConfig.forwardingAdmin(aClientConfig.adminConfig()).getClass().getName(), "Cluster a uses the default ForwardingAdmin"); assertEquals("PLAINTEXT", aClientConfig.adminConfig().get("security.protocol"), @@ -115,7 +115,7 @@ public void testClientConfigProperties() { assertFalse(aClientConfig.adminConfig().containsKey("xxx"), "unknown properties aren't included in client configs"); assertFalse(aClientConfig.adminConfig().containsKey("metric.reporters"), - "top-leve metrics reporters aren't included in client configs"); + "top-level metrics reporters aren't included in client configs"); assertEquals("secret2", aClientConfig.getPassword("ssl.key.password").value(), "security properties are translated from external sources"); assertEquals("secret2", ((Password) aClientConfig.adminConfig().get("ssl.key.password")).value(), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java index 0fee904d12688..11c2ba9d37425 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java @@ -98,7 +98,7 @@ private static void validateReplicationFactor(String configName, short factor) { } } - public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) { + public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultPartitionCount) { int orderInGroup = 0; ConfigDef configDef = new ConfigDef(); configDef @@ -115,7 +115,7 @@ public static ConfigDef configDef(String group, short defaultReplicationFactor, ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup, ConfigDef.Width.LONG, "Replication Factor for Topics in " + group) .define(PARTITIONS_CONFIG, ConfigDef.Type.INT, - defaultParitionCount, PARTITIONS_VALIDATOR, + defaultPartitionCount, PARTITIONS_VALIDATOR, ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup, ConfigDef.Width.LONG, "Partition Count for Topics in " + group); return configDef; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java index fde3f6944f72a..37ad594ab7650 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java @@ -34,7 +34,7 @@ public class WorkerTransactionContext implements TransactionContext { private static final Logger log = LoggerFactory.getLogger(WorkerTransactionContext.class); - private final Set commitableRecords = new HashSet<>(); + private final Set committableRecords = new HashSet<>(); private final Set abortableRecords = new HashSet<>(); private boolean batchCommitRequested = false; private boolean batchAbortRequested = false; @@ -47,7 +47,7 @@ public synchronized void commitTransaction() { @Override public synchronized void commitTransaction(SourceRecord record) { Objects.requireNonNull(record, "Source record used to define transaction boundaries may not be null"); - commitableRecords.add(record); + committableRecords.add(record); } @Override @@ -82,7 +82,7 @@ public synchronized boolean shouldCommitOn(SourceRecord record) { // Essentially, instead of telling the task that it screwed up and trusting it to do the right thing, we rat on it to the // worker and let it get punished accordingly. checkRecordRequestConsistency(record); - return commitableRecords.remove(record); + return committableRecords.remove(record); } public synchronized boolean shouldAbortOn(SourceRecord record) { @@ -97,7 +97,7 @@ private void checkBatchRequestsConsistency() { } private void checkRecordRequestConsistency(SourceRecord record) { - if (commitableRecords.contains(record) && abortableRecords.contains(record)) { + if (committableRecords.contains(record) && abortableRecords.contains(record)) { log.trace("Connector will fail as it has requested both commit and abort of transaction for same record: {}", record); throw new IllegalStateException(String.format( "Connector requested both commit and abort of same record against topic/partition %s/%s", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 7fc038523635e..0dab3714f6d3d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -128,7 +128,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - // tolerate all erros + // tolerate all errors props.put(ERRORS_TOLERANCE_CONFIG, "all"); // retry for up to one second @@ -205,7 +205,7 @@ public void testErrantRecordReporter() throws Exception { props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - // tolerate all erros + // tolerate all errors props.put(ERRORS_TOLERANCE_CONFIG, "all"); // retry for up to one second diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 3a0090f2267b0..1946b01ee5de0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -399,7 +399,7 @@ private ConnectorConfig connConfig(Map connProps) { } @Test - public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception { + public void testErrorHandlingInSourceTasksWithBadConverter() throws Exception { Map reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 72a2493e7f000..8d246cd04b3f9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -36,7 +36,7 @@ public class PluginDescTest { private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader(); private final String regularVersion = "1.0.0"; private final String newerVersion = "1.0.1"; - private final String snaphotVersion = "1.0.0-SNAPSHOT"; + private final String snapshotVersion = "1.0.0-SNAPSHOT"; private final String noVersion = "undefined"; private PluginClassLoader pluginLoader; @@ -61,11 +61,11 @@ public void testRegularPluginDesc() { PluginDesc converterDesc = new PluginDesc<>( Converter.class, - snaphotVersion, + snapshotVersion, pluginLoader ); - assertPluginDesc(converterDesc, Converter.class, snaphotVersion, pluginLoader.location()); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location()); PluginDesc transformDesc = new PluginDesc<>( Transformation.class, @@ -90,11 +90,11 @@ public void testPluginDescWithSystemClassLoader() { PluginDesc converterDesc = new PluginDesc<>( Converter.class, - snaphotVersion, + snapshotVersion, systemLoader ); - assertPluginDesc(converterDesc, Converter.class, snaphotVersion, location); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location); PluginDesc transformDesc = new PluginDesc<>( Transformation.class, @@ -136,13 +136,13 @@ public void testPluginDescWithNullVersion() { public void testPluginDescEquality() { PluginDesc connectorDescPluginPath = new PluginDesc<>( Connector.class, - snaphotVersion, + snapshotVersion, pluginLoader ); PluginDesc connectorDescClasspath = new PluginDesc<>( Connector.class, - snaphotVersion, + snapshotVersion, systemLoader ); @@ -204,7 +204,7 @@ public void testPluginDescComparison() { PluginDesc converterDescClasspath = new PluginDesc<>( Converter.class, - snaphotVersion, + snapshotVersion, systemLoader ); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index 19766989fcf46..3aa2cb5c352f6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -511,7 +511,7 @@ private List createBasicExpectedUrls() throws IOException { private void assertUrls(List expected, List actual) { Collections.sort(expected); - // not sorting 'actual' because it should be returned sorted from withing the PluginUtils. + // not sorting 'actual' because it should be returned sorted from within the PluginUtils. assertEquals(expected, actual); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java index ad5c88f9aeb21..0e7ad4d26b548 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java @@ -135,7 +135,7 @@ public void testAdvertisedUri() { Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString()); server.stop(); - // Listener is overriden by advertised values + // Listener is overridden by advertised values configMap = new HashMap<>(baseServerProps()); configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443"); configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 1f3478253076c..250c5a187f573 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -163,11 +163,11 @@ public class KafkaConfigBackingStoreTest { = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0); private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false); - private static final Struct INLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true); + private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true); private static final List RESTART_REQUEST_STRUCTS = Arrays.asList( new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true).put(INCLUDE_TASKS_FIELD_NAME, false), ONLY_FAILED_MISSING_STRUCT, - INLUDE_TASKS_MISSING_STRUCT); + INCLUDE_TASKS_MISSING_STRUCT); // The exact format doesn't matter here since both conversions are mocked private static final List CONFIGS_SERIALIZED = Arrays.asList( @@ -1413,7 +1413,7 @@ public void testRecordToRestartRequestOnlyFailedInconsistent() { public void testRecordToRestartRequestIncludeTasksInconsistent() { ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()); - Struct struct = INLUDE_TASKS_MISSING_STRUCT; + Struct struct = INCLUDE_TASKS_MISSING_STRUCT; SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct)); RestartRequest restartRequest = configStorage.recordToRestartRequest(record, schemaAndValue); assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index f5c43c718a793..7e68584237da5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -343,7 +343,7 @@ public Map> describeTopics(Set topicN Throwable cause = e.getCause(); if (cause instanceof UnknownTopicOrPartitionException) { results.put(topicName, Optional.empty()); - log.info("Found non-existant topic {}", topicName); + log.info("Found non-existent topic {}", topicName); continue; } throw new AssertionError("Could not describe topic(s)" + topicNames, e); From d54062c51b8f447073baa6a3e1d6aded86195947 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 22 Jun 2023 09:34:49 -0400 Subject: [PATCH 8/8] KAFKA-15098 Allow authorizers to be configured in ZK migration (#13895) Reviewers: Ron Dagostino --- .../main/scala/kafka/server/KafkaConfig.scala | 2 - .../kafka/zk/ZkMigrationIntegrationTest.scala | 54 +++++++++++++++++++ .../unit/kafka/server/KafkaConfigTest.scala | 7 +-- .../migration/KRaftMigrationDriver.java | 4 +- .../migration/MigrationDriverState.java | 2 +- 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 44858e4b7642b..3c4770bd5502a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -2292,8 +2292,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } else { // ZK-based if (migrationEnabled) { - require(!originals.containsKey(KafkaConfig.AuthorizerClassNameProp), - s"ZooKeeper migration does not yet support authorizers. Remove ${KafkaConfig.AuthorizerClassNameProp} before performing a migration.") validateNonEmptyQuorumVotersForMigration() require(controllerListenerNames.nonEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}") diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 02bc1b18726a0..fc5da56721f19 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest( + brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES, + metadataVersion = MetadataVersion.IBP_3_4_IV0, + serverProperties = Array( + new ClusterConfigProperty(key = "authorizer.class.name", value = "kafka.security.authorizer.AclAuthorizer"), + new ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"), + new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), + new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + ) + ) + def testStartZkBrokerWithAuthorizer(zkCluster: ClusterInstance): Unit = { + // Bootstrap the ZK cluster ID into KRaft + val clusterId = zkCluster.clusterId() + val kraftCluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). + setClusterId(Uuid.fromString(clusterId)). + setNumBrokerNodes(0). + setNumControllerNodes(1).build()) + .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") + .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .build() + try { + kraftCluster.format() + kraftCluster.startup() + val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) + + // Enable migration configs and restart brokers + log.info("Restart brokers in migration mode") + val clientProps = kraftCluster.controllerClientProperties() + val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG) + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters) + zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed + zkCluster.waitForReadyBrokers() + readyFuture.get(30, TimeUnit.SECONDS) + + val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient + TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over") + + def inDualWrite(): Boolean = { + val migrationState = kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10, TimeUnit.SECONDS) + migrationState.allowDualWrite() + } + TestUtils.waitUntilTrue(() => inDualWrite(), "Timed out waiting for dual-write mode") + } finally { + shutdownInSequence(zkCluster, kraftCluster) + } + } + /** * Test ZkMigrationClient against a real ZooKeeper-backed Kafka cluster. This test creates a ZK cluster * and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index d54c1263ed1d5..8044877d6b05b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1669,12 +1669,9 @@ class KafkaConfigTest { // All needed configs are now set KafkaConfig.fromProps(props) - // Don't allow migration startup with an authorizer set + // Check that we allow authorizer to be set props.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getCanonicalName) - assertEquals( - "requirement failed: ZooKeeper migration does not yet support authorizers. Remove authorizer.class.name before performing a migration.", - assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage) - props.remove(KafkaConfig.AuthorizerClassNameProp) + KafkaConfig.fromProps(props) // Don't allow migration startup with an older IBP props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.IBP_3_3_IV0.version()) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 0b8aa0341f919..de691fa36f97f 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -145,7 +145,7 @@ public void shutdown() throws InterruptedException { } // Visible for testing - CompletableFuture migrationState() { + public CompletableFuture migrationState() { CompletableFuture stateFuture = new CompletableFuture<>(); eventQueue.append(() -> stateFuture.complete(migrationState)); return stateFuture; @@ -328,7 +328,7 @@ public void onMetadataUpdate( /** * Construct and enqueue a {@link MetadataChangeEvent} with a given completion handler. In production use cases, - * this handler is a no-op. This method exists so we can add additional logic in our unit tests to wait for the + * this handler is a no-op. This method exists so that we can add additional logic in our unit tests to wait for the * enqueued event to finish executing. */ void enqueueMetadataChangeEvent( diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java index 2f35485564022..07d318d12098a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java @@ -55,7 +55,7 @@ public enum MigrationDriverState { this.allowDualWrite = allowDualWrite; } - boolean allowDualWrite() { + public boolean allowDualWrite() { return allowDualWrite; } }