diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index b5c54596b2195..affac79661095 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -21,7 +21,7 @@ import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import kafka.server.KafkaConfig import kafka.utils.{Exit, Logging, ToolsUtils} import kafka.utils.Implicits._ -import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} +import kafka.zk.{ControllerZNode, KafkaZkClient, MigrationZNode, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.config.ZkConfigs @@ -260,12 +260,18 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { } private def run(enablePathCheck: Boolean): Unit = { + def skipSetAcl(path: String): Boolean = { + val isControllerPath = path == ControllerZNode.path + val isMigrationPath = path == MigrationZNode.path + (isControllerPath || isMigrationPath) && !zkClient.pathExists(path) + } + try { setAclIndividually("/") checkPathExistenceAndMaybeExit(enablePathCheck) for (path <- ZkData.SecureRootPaths) { debug("Going to set ACL for %s".format(path)) - if (path == ControllerZNode.path && !zkClient.pathExists(path)) { + if (skipSetAcl(path)) { debug("Ignoring to set ACL for %s, because it doesn't exist".format(path)) } else { zkClient.makeSurePersistentPathExists(path) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index cb22d3caccf5d..c447f22a1423b 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1734,20 +1734,32 @@ class KafkaZkClient private[zk] ( val getDataResponse = retryRequestUntilConnected(getDataRequest) getDataResponse.resultCode match { case Code.OK => - MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime) + Option(getDataResponse.data) match { + case Some(data) => + MigrationZNode.decode(data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime) + case None => + info("Migration znode exists with null data, recreating initial migration state") + createInitialMigrationState(initialState, removeFirst = true) + } case Code.NONODE => createInitialMigrationState(initialState) case _ => throw getDataResponse.resultException.get } } - private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = { - val createRequest = CreateRequest( + private def createInitialMigrationState(initialState: ZkMigrationLeadershipState, removeFirst: Boolean = false): ZkMigrationLeadershipState = { + val createOp = CreateOp( MigrationZNode.path, MigrationZNode.encode(initialState), defaultAcls(MigrationZNode.path), CreateMode.PERSISTENT) - val response = retryRequestUntilConnected(createRequest) + val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion) + val multi = if (removeFirst) { + MultiRequest(Seq(deleteOp, createOp)) + } else { + MultiRequest(Seq(createOp)) + } + val response = retryRequestUntilConnected(multi) response.maybeThrow() initialState.withMigrationZkVersion(0) } diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 4f5bec4d5276c..0fcf5cc1a945e 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -38,6 +38,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.kafka.server.common.MetadataVersion import org.apache.zookeeper.client.ZKClientConfig +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.Seq @@ -147,11 +149,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { * Tests the migration tool when making an unsecure * cluster secure. */ - @Test - def testZkMigration(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testZkMigration(includeAllZnodes: Boolean): Unit = { val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false) try { - testMigration(zkConnect, unsecureZkClient, zkClient) + testMigration(zkConnect, unsecureZkClient, zkClient, includeAllZnodes) } finally { unsecureZkClient.close() } @@ -161,11 +164,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { * Tests the migration tool when making a secure * cluster unsecure. */ - @Test - def testZkAntiMigration(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testZkAntiMigration(includeAllZnodes: Boolean): Unit = { val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false) try { - testMigration(zkConnect, zkClient, unsecureZkClient) + testMigration(zkConnect, zkClient, unsecureZkClient, includeAllZnodes) } finally { unsecureZkClient.close() } @@ -218,9 +222,17 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { * Exercises the migration tool. It is used in these test cases: * testZkMigration, testZkAntiMigration, testChroot. */ - private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = { + private def testMigration( + zkUrl: String, + firstZk: KafkaZkClient, + secondZk: KafkaZkClient, + includeAllZnodes: Boolean = true): Unit = { info(s"zkConnect string: $zkUrl") - for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) { + // Optionally do not create controller and migration znodes + val predicate: String => Boolean = if (includeAllZnodes) _ => true else skipCreateZnodes + val paths = (ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths).filter(predicate) + + for (path <- paths) { info(s"Creating $path") firstZk.makeSurePersistentPathExists(path) // Create a child for each znode to exercise the recurrent @@ -241,7 +253,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { } ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl")) info("Done with migration") - for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) { + for (path <- paths) { val sensitive = ZkData.sensitivePath(path) val listParent = secondZk.getAcl(path) assertTrue(isAclCorrect(listParent, secondZk.secure, sensitive), path) @@ -257,6 +269,18 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { ZkData.sensitivePath(ExtendedAclZNode.path)), "/kafka-acl-extended") assertTrue(isAclCorrect(firstZk.getAcl("/feature"), secondZk.secure, ZkData.sensitivePath(FeatureZNode.path)), "ACL mismatch for /feature path") + + if (!includeAllZnodes) { + // Check controller and migration znodes should not be created + assertFalse(firstZk.pathExists(ControllerZNode.path)) + assertFalse(firstZk.pathExists(MigrationZNode.path)) + } + } + + private def skipCreateZnodes(path: String): Boolean = { + val isNotControllerPath = path != ControllerZNode.path + val isNotMigrationPath = path != MigrationZNode.path + isNotControllerPath && isNotMigrationPath } /** diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index cef3dad7ac89f..b3041dd0807e4 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness { } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER) } + @Test + def testMigrationZnodeWithNullValue(): Unit = { + val (controllerEpoch, stat) = zkClient.getControllerEpoch.get + var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion) + zkClient.retryRequestUntilConnected(CreateRequest( + MigrationZNode.path, + null, + zkClient.defaultAcls(MigrationZNode.path), + CreateMode.PERSISTENT)) + + migrationState = zkClient.getOrCreateMigrationState(migrationState) + + assertEquals(0, migrationState.migrationZkVersion()) + } + @Test def testFailToUpdateMigrationZNode(): Unit = { val (controllerEpoch, stat) = zkClient.getControllerEpoch.get diff --git a/docs/ops.html b/docs/ops.html index e8309187a5686..f5f95ffaf4449 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4064,6 +4064,8 @@

Provisioning the KRaft controller quorum

The new standalone controller in the example configuration above should be formatted using the kafka-storage format --standalonecommand.

+

Note: The migration can stall if the ZooKeeper Security Migration Tool was previously executed (fixed from 3.9.2, see KAFKA-19026 for more details). As a workaround, the malformed "/migration" node can be removed from ZooKeeper by running delete /migration with the zookeeper-shell.sh CLI tool.

+

Note: The KRaft cluster node.id values must be different from any existing ZK broker broker.id. In KRaft-mode, the brokers and controllers share the same Node ID namespace.