Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, MigrationZNode, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.ZkConfigs
Expand Down Expand Up @@ -260,12 +260,18 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}

private def run(enablePathCheck: Boolean): Unit = {
def skipSetAcl(path: String): Boolean = {
val isControllerPath = path == ControllerZNode.path
val isMigrationPath = path == MigrationZNode.path
(isControllerPath || isMigrationPath) && !zkClient.pathExists(path)
}

try {
setAclIndividually("/")
checkPathExistenceAndMaybeExit(enablePathCheck)
for (path <- ZkData.SecureRootPaths) {
debug("Going to set ACL for %s".format(path))
if (path == ControllerZNode.path && !zkClient.pathExists(path)) {
if (skipSetAcl(path)) {
debug("Ignoring to set ACL for %s, because it doesn't exist".format(path))
} else {
zkClient.makeSurePersistentPathExists(path)
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1734,20 +1734,32 @@ class KafkaZkClient private[zk] (
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
Option(getDataResponse.data) match {
case Some(data) =>
MigrationZNode.decode(data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
case None =>
info("Migration znode exists with null data, recreating initial migration state")
createInitialMigrationState(initialState, removeFirst = true)
}
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}

private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val createRequest = CreateRequest(
private def createInitialMigrationState(initialState: ZkMigrationLeadershipState, removeFirst: Boolean = false): ZkMigrationLeadershipState = {
val createOp = CreateOp(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
val response = retryRequestUntilConnected(createRequest)
val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
val multi = if (removeFirst) {
MultiRequest(Seq(deleteOp, createOp))
} else {
MultiRequest(Seq(createOp))
}
val response = retryRequestUntilConnected(multi)
response.maybeThrow()
initialState.withMigrationZkVersion(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._
import scala.collection.Seq
Expand Down Expand Up @@ -147,11 +149,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Tests the migration tool when making an unsecure
* cluster secure.
*/
@Test
def testZkMigration(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testZkMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, unsecureZkClient, zkClient)
testMigration(zkConnect, unsecureZkClient, zkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
Expand All @@ -161,11 +164,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Tests the migration tool when making a secure
* cluster unsecure.
*/
@Test
def testZkAntiMigration(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testZkAntiMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, zkClient, unsecureZkClient)
testMigration(zkConnect, zkClient, unsecureZkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
Expand Down Expand Up @@ -218,9 +222,17 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Exercises the migration tool. It is used in these test cases:
* testZkMigration, testZkAntiMigration, testChroot.
*/
private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = {
private def testMigration(
zkUrl: String,
firstZk: KafkaZkClient,
secondZk: KafkaZkClient,
includeAllZnodes: Boolean = true): Unit = {
info(s"zkConnect string: $zkUrl")
for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
// Optionally do not create controller and migration znodes
val predicate: String => Boolean = if (includeAllZnodes) _ => true else skipCreateZnodes
val paths = (ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths).filter(predicate)

for (path <- paths) {
info(s"Creating $path")
firstZk.makeSurePersistentPathExists(path)
// Create a child for each znode to exercise the recurrent
Expand All @@ -241,7 +253,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
}
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
for (path <- paths) {
val sensitive = ZkData.sensitivePath(path)
val listParent = secondZk.getAcl(path)
assertTrue(isAclCorrect(listParent, secondZk.secure, sensitive), path)
Expand All @@ -257,6 +269,18 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
ZkData.sensitivePath(ExtendedAclZNode.path)), "/kafka-acl-extended")
assertTrue(isAclCorrect(firstZk.getAcl("/feature"), secondZk.secure,
ZkData.sensitivePath(FeatureZNode.path)), "ACL mismatch for /feature path")

if (!includeAllZnodes) {
// Check controller and migration znodes should not be created
assertFalse(firstZk.pathExists(ControllerZNode.path))
assertFalse(firstZk.pathExists(MigrationZNode.path))
}
}

private def skipCreateZnodes(path: String): Boolean = {
val isNotControllerPath = path != ControllerZNode.path
val isNotMigrationPath = path != MigrationZNode.path
isNotControllerPath && isNotMigrationPath
}

/**
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}

@Test
def testMigrationZnodeWithNullValue(): Unit = {
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
zkClient.retryRequestUntilConnected(CreateRequest(
MigrationZNode.path,
null,
zkClient.defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT))

migrationState = zkClient.getOrCreateMigrationState(migrationState)

assertEquals(0, migrationState.migrationZkVersion())
}

@Test
def testFailToUpdateMigrationZNode(): Unit = {
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
Expand Down
2 changes: 2 additions & 0 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -4064,6 +4064,8 @@ <h3>Provisioning the KRaft controller quorum</h3>

<p>The new standalone controller in the example configuration above should be formatted using the <code>kafka-storage format --standalone</code>command.</p>

<p>Note: The migration can stall if the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a> was previously executed (fixed from 3.9.2, see <a href="https://issues.apache.org/jira/browse/KAFKA-19480">KAFKA-19026</a> for more details). As a workaround, the malformed "/migration" node can be removed from ZooKeeper by running <code>delete /migration</code> with the <code>zookeeper-shell.sh</code> CLI tool.</p>

<p><em>Note: The KRaft cluster <code>node.id</code> values must be different from any existing ZK broker <code>broker.id</code>.
In KRaft-mode, the brokers and controllers share the same Node ID namespace.</em></p>

Expand Down