Skip to content

Commit 0bc2c6e

Browse files
authored
MINOR: Move topic creation before consumer creation in testListGroups integration test (#20496)
This PR moves the topic creation before consumer creations in `PlaintextAdminIntegrationTest.testListGroups`, to avoid potential errors if consumer creates topic due to metadata update. See discussion #20244 (comment) Reviewers: @chia7712, [email protected]
1 parent 9351270 commit 0bc2c6e

File tree

1 file changed

+20
-20
lines changed

1 file changed

+20
-20
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,6 +2580,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
25802580
val config = createConfig
25812581
client = Admin.create(config)
25822582

2583+
client.createTopics(util.Set.of(
2584+
new NewTopic(testTopicName, 1, 1.toShort)
2585+
)).all().get()
2586+
waitForTopics(client, List(testTopicName), List())
2587+
val topicPartition = new TopicPartition(testTopicName, 0)
2588+
25832589
consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name)
25842590
val classicGroupConfig = new Properties(consumerConfig)
25852591
classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
@@ -2600,12 +2606,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26002606
)
26012607

26022608
try {
2603-
client.createTopics(util.Set.of(
2604-
new NewTopic(testTopicName, 1, 1.toShort)
2605-
)).all().get()
2606-
waitForTopics(client, List(testTopicName), List())
2607-
val topicPartition = new TopicPartition(testTopicName, 0)
2608-
26092609
classicGroup.subscribe(util.Set.of(testTopicName))
26102610
classicGroup.poll(JDuration.ofMillis(1000))
26112611
consumerGroup.subscribe(util.Set.of(testTopicName))
@@ -2628,20 +2628,22 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26282628
val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE))
26292629
val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
26302630
val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))
2631-
// Streams group could either be in STABLE or NOT_READY state
2632-
val streamsGroupListingStable = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
2633-
val streamsGroupListingNotReady = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY))
2631+
val streamsGroupListing = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
26342632

26352633
var listGroupsResult = client.listGroups()
26362634
assertTrue(listGroupsResult.errors().get().isEmpty)
26372635

2638-
val expectedStreamListings = Set(streamsGroupListingStable, streamsGroupListingNotReady)
2639-
val expectedListings = Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing)
2640-
val actualListings = listGroupsResult.all().get().asScala.toSet
2641-
2642-
// Check that actualListings contains all expectedListings and one of the streams listings
2643-
assertTrue(expectedListings.subsetOf(actualListings))
2644-
assertTrue(actualListings.exists(expectedStreamListings.contains))
2636+
TestUtils.waitUntilTrue(() => {
2637+
val listGroupResultScala = client.listGroups().all().get().asScala
2638+
val filteredStreamsGroups = listGroupResultScala.filter(_.groupId() == streamsGroupId)
2639+
val filteredClassicGroups = listGroupResultScala.filter(_.groupId() == classicGroupId)
2640+
val filteredConsumerGroups = listGroupResultScala.filter(_.groupId() == consumerGroupId)
2641+
val filteredShareGroups = listGroupResultScala.filter(_.groupId() == shareGroupId)
2642+
filteredClassicGroups.forall(_.groupState().orElse(null) == GroupState.STABLE) &&
2643+
filteredConsumerGroups.forall(_.groupState().orElse(null) == GroupState.STABLE) &&
2644+
filteredShareGroups.forall(_.groupState().orElse(null) == GroupState.STABLE) &&
2645+
filteredStreamsGroups.forall(_.groupState().orElse(null) == GroupState.STABLE)
2646+
}, "Groups not stable yet")
26452647

26462648
listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC)))
26472649
assertTrue(listGroupsResult.errors().get().isEmpty)
@@ -2660,10 +2662,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26602662

26612663
listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS)))
26622664
assertTrue(listGroupsResult.errors().get().isEmpty)
2663-
assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable)) ||
2664-
listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
2665-
assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable)) ||
2666-
listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
2665+
assertEquals(Set(streamsGroupListing), listGroupsResult.all().get().asScala.toSet)
2666+
assertEquals(Set(streamsGroupListing), listGroupsResult.valid().get().asScala.toSet)
26672667

26682668
} finally {
26692669
Utils.closeQuietly(classicGroup, "classicGroup")

0 commit comments

Comments
 (0)