Skip to content

Commit fd0c376

Browse files
jim0987795064Ubuntu
authored andcommitted
KAFKA-18105 Fix flaky PlaintextAdminIntegrationTest#testElectPreferredLeaders (apache#20068)
## Changes This PR improves the stability of the PlaintextAdminIntegrationTest.testElectPreferredLeaders test by introducing short Thread.sleep( ) delays before invoking: - changePreferredLeader( ) - waitForBrokersOutOfIsr( ) ## Reasons - Metadata propagation for partition2 : Kafka requires time to propagate the updated leader metadata across all brokers. Without waiting, metadataCache may return outdated leader information for partition2. - Eviction of broker1 from the ISR : To simulate a scenario where broker1 is no longer eligible as leader, the test relies on broker1 being removed from the ISR (e.g., due to intentional shutdown). This eviction is not instantaneous and requires a brief delay before Kafka reflects the change. Reviewers: PoAn Yang <[email protected]>, TengYao Chi <[email protected]>, TaiJuWu <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 9178f89 commit fd0c376

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3061,6 +3061,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
30613061
}
30623062
}
30633063

3064+
/**
3065+
* Waits until the metadata for the given partition has fully propagated and become consistent across all brokers.
3066+
*
3067+
* @param partition The partition whose leader metadata should be verified across all brokers.
3068+
*/
3069+
def waitForBrokerMetadataPropagation(partition: TopicPartition): Unit = {
3070+
while (brokers.exists(_.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).isEmpty) ||
3071+
brokers.map(_.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName))
3072+
.filter(_.isPresent)
3073+
.map(_.get())
3074+
.toSet.size != 1)
3075+
TimeUnit.MILLISECONDS.sleep(300)
3076+
}
3077+
30643078
@Test
30653079
def testElectPreferredLeaders(): Unit = {
30663080
client = createAdminClient
@@ -3087,12 +3101,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
30873101
val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id()
30883102
val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()
30893103

3090-
var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
3104+
var reassignmentMap = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
30913105
if (prior1 != preferred)
3092-
m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
3106+
reassignmentMap += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
30933107
if (prior2 != preferred)
3094-
m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
3095-
client.alterPartitionReassignments(m.asJava).all().get()
3108+
reassignmentMap += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
3109+
client.alterPartitionReassignments(reassignmentMap.asJava).all().get()
30963110

30973111
TestUtils.waitUntilTrue(
30983112
() => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
@@ -3120,6 +3134,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
31203134
TestUtils.assertLeader(client, partition2, 0)
31213135

31223136
// Now change the preferred leader to 1
3137+
waitForBrokerMetadataPropagation(partition1)
3138+
waitForBrokerMetadataPropagation(partition2)
31233139
changePreferredLeader(prefer1)
31243140

31253141
// meaningful election
@@ -3158,6 +3174,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
31583174
TestUtils.assertLeader(client, partition2, 1)
31593175

31603176
// Now change the preferred leader to 2
3177+
waitForBrokerMetadataPropagation(partition1)
3178+
waitForBrokerMetadataPropagation(partition2)
31613179
changePreferredLeader(prefer2)
31623180

31633181
// mixed results
@@ -3174,9 +3192,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
31743192
TestUtils.assertLeader(client, partition2, 2)
31753193

31763194
// Now change the preferred leader to 1
3195+
waitForBrokerMetadataPropagation(partition1)
3196+
waitForBrokerMetadataPropagation(partition2)
31773197
changePreferredLeader(prefer1)
31783198
// but shut it down...
31793199
killBroker(1)
3200+
waitForBrokerMetadataPropagation(partition1)
3201+
waitForBrokerMetadataPropagation(partition2)
31803202
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
31813203

31823204
def assertPreferredLeaderNotAvailable(

0 commit comments

Comments
 (0)