Skip to content

Commit 2267518

Browse files
authored
KAFKA-17407 Fix flaky ShareGroupHeartbeatRequestTest.testPartitionAssignmentWithChangingTopics (apache#16994)
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6b28e81 commit 2267518

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
407407
controllers = raftCluster.controllers().values().asScala.toSeq
408408
)
409409
// Heartbeat request to join the group. Note that the member subscribes
410-
// to an nonexistent topic.
410+
// to a nonexistent topic.
411411
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
412412
new ShareGroupHeartbeatRequestData()
413413
.setGroupId("grp")
@@ -457,6 +457,9 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
457457
true
458458
).build()
459459

460+
cluster.waitForTopic("foo", 2)
461+
cluster.waitForTopic("bar", 3)
462+
460463
TestUtils.waitUntilTrue(() => {
461464
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
462465
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
@@ -465,7 +468,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
465468
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
466469
}, msg = s"Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse.")
467470
// Verify the response.
468-
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
471+
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
469472
// Create the topic baz.
470473
val bazTopicId = TestUtils.createTopicWithAdminRaw(
471474
admin = admin,
@@ -489,7 +492,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
489492
new ShareGroupHeartbeatRequestData()
490493
.setGroupId("grp")
491494
.setMemberId(memberId)
492-
.setMemberEpoch(3),
495+
.setMemberEpoch(2),
493496
true
494497
).build()
495498

@@ -501,7 +504,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
501504
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
502505
}, msg = s"Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse.")
503506
// Verify the response.
504-
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
507+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
505508
// Increasing the partitions of topic bar which is already being consumed in the share group.
506509
increasePartitions(admin, "bar", 6, Seq.empty)
507510

@@ -521,7 +524,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
521524
new ShareGroupHeartbeatRequestData()
522525
.setGroupId("grp")
523526
.setMemberId(memberId)
524-
.setMemberEpoch(4),
527+
.setMemberEpoch(3),
525528
true
526529
).build()
527530

@@ -533,7 +536,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
533536
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
534537
}, msg = s"Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse.")
535538
// Verify the response.
536-
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
539+
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
537540
// Delete the topic foo.
538541
TestUtils.deleteTopicWithAdmin(
539542
admin = admin,
@@ -555,7 +558,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
555558
new ShareGroupHeartbeatRequestData()
556559
.setGroupId("grp")
557560
.setMemberId(memberId)
558-
.setMemberEpoch(5),
561+
.setMemberEpoch(4),
559562
true
560563
).build()
561564

@@ -567,7 +570,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
567570
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
568571
}, msg = s"Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse.")
569572
// Verify the response.
570-
assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
573+
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
571574
}
572575

573576
@ClusterTest(

0 commit comments

Comments
 (0)