Skip to content

Commit ef07b5f

Browse files
authored
KAFKA-19461: Add share group admin integration tests to PlaintextAdminIntegrationTest (#20103)
Add its for `Admin.deleteShareGroupOffsets`, `Admin.alterShareGroupOffsets` and `Admin.listShareGroupOffsets` to `PlaintextAdminIntegrationTest`. Reviewers: Andrew Schofield <[email protected]>
1 parent 24d03a1 commit ef07b5f

File tree

1 file changed

+217
-34
lines changed

1 file changed

+217
-34
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 217 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2698,6 +2698,34 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26982698
}
26992699
}
27002700

2701+
/**
2702+
* Verify that initially there are no share groups to list.
2703+
*/
2704+
private def assertNoShareGroupsExist(): Unit = {
2705+
val list = client.listGroups()
2706+
assertEquals(0, list.all().get().size())
2707+
assertEquals(0, list.errors().get().size())
2708+
assertEquals(0, list.valid().get().size())
2709+
}
2710+
2711+
private def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic: String, latch: CountDownLatch): Thread = {
2712+
new Thread {
2713+
override def run : Unit = {
2714+
consumer.subscribe(util.Set.of(topic))
2715+
try {
2716+
while (true) {
2717+
consumer.poll(JDuration.ofSeconds(5))
2718+
if (latch.getCount > 0L)
2719+
latch.countDown()
2720+
consumer.commitSync()
2721+
}
2722+
} catch {
2723+
case _: InterruptException => // Suppress the output to stderr
2724+
}
2725+
}
2726+
}
2727+
}
2728+
27012729
@Test
27022730
def testShareGroups(): Unit = {
27032731
val testGroupId = "test_group_id"
@@ -2715,46 +2743,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
27152743

27162744
val consumerSet = Set(createShareConsumer(configOverrides = createProperties()))
27172745
val topicSet = Set(testTopicName)
2718-
27192746
val latch = new CountDownLatch(consumerSet.size)
27202747

2721-
def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic: String): Thread = {
2722-
new Thread {
2723-
override def run : Unit = {
2724-
consumer.subscribe(util.Set.of(topic))
2725-
try {
2726-
while (true) {
2727-
consumer.poll(JDuration.ofSeconds(5))
2728-
if (latch.getCount > 0L)
2729-
latch.countDown()
2730-
consumer.commitSync()
2731-
}
2732-
} catch {
2733-
case _: InterruptException => // Suppress the output to stderr
2734-
}
2735-
}
2736-
}
2737-
}
2738-
27392748
val config = createConfig
27402749
client = Admin.create(config)
2741-
val producer = createProducer()
27422750
try {
2743-
// Verify that initially there are no share groups to list.
2744-
val list = client.listGroups()
2745-
assertEquals(0, list.all().get().size())
2746-
assertEquals(0, list.errors().get().size())
2747-
assertEquals(0, list.valid().get().size())
2748-
2749-
client.createTopics(util.Set.of(
2750-
new NewTopic(testTopicName, testNumPartitions, 1.toShort)
2751-
)).all().get()
2752-
waitForTopics(client, List(testTopicName), List())
2753-
2754-
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
2751+
assertNoShareGroupsExist()
2752+
prepareTopics(List(testTopicName), testNumPartitions)
2753+
prepareRecords(testTopicName)
27552754

27562755
// Start consumers in a thread that will subscribe to a new group.
2757-
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createShareConsumerThread(zipped._1, zipped._2))
2756+
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createShareConsumerThread(zipped._1, zipped._2, latch))
27582757

27592758
try {
27602759
consumerThreads.foreach(_.start())
@@ -2846,7 +2845,191 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
28462845
}
28472846
} finally {
28482847
consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
2849-
Utils.closeQuietly(producer, "producer")
2848+
Utils.closeQuietly(client, "adminClient")
2849+
}
2850+
}
2851+
2852+
@Test
2853+
def testDeleteShareGroupOffsets(): Unit = {
2854+
val config = createConfig
2855+
client = Admin.create(config)
2856+
val testTopicName = "test_topic"
2857+
val testGroupId = "test_group_id"
2858+
val testClientId = "test_client_id"
2859+
val fakeGroupId = "fake_group_id"
2860+
val fakeTopicName = "foo"
2861+
2862+
try {
2863+
prepareTopics(List(testTopicName), 1)
2864+
prepareRecords(testTopicName)
2865+
2866+
val newShareConsumerConfig = new Properties(consumerConfig)
2867+
newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
2868+
newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
2869+
2870+
Using.resource(createShareConsumer(configOverrides = newShareConsumerConfig)) { consumer =>
2871+
consumer.subscribe(util.List.of(testTopicName))
2872+
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
2873+
consumer.commitSync()
2874+
2875+
// listGroups is used to list share groups
2876+
// Test that we can list the new group.
2877+
TestUtils.waitUntilTrue(() => {
2878+
client.listGroups.all.get.stream().filter(group =>
2879+
group.groupId == testGroupId &&
2880+
group.groupState.get == GroupState.STABLE).count() == 1
2881+
}, s"Expected to be able to list $testGroupId")
2882+
2883+
// Test offset deletion while consuming
2884+
val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName))
2885+
2886+
// Deleting the offset with real group ID should get GroupNotEmptyException
2887+
assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.all())
2888+
assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(testTopicName))
2889+
assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(fakeTopicName))
2890+
2891+
// Test the fake group ID
2892+
val fakeDeleteResult = client.deleteShareGroupOffsets(fakeGroupId, util.Set.of(testTopicName, fakeTopicName))
2893+
2894+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.all())
2895+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(testTopicName))
2896+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(fakeTopicName))
2897+
}
2898+
2899+
// Test offset deletion when group is empty
2900+
val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName))
2901+
2902+
assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.all())
2903+
assertNull(offsetDeleteResult.topicResult(testTopicName).get())
2904+
assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.topicResult(fakeTopicName))
2905+
2906+
val tp1 = new TopicPartition(testTopicName, 0)
2907+
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
2908+
.partitionsToOffsetAndMetadata(testGroupId)
2909+
.get()
2910+
assertTrue(parts.containsKey(tp1))
2911+
assertNull(parts.get(tp1))
2912+
} finally {
2913+
Utils.closeQuietly(client, "adminClient")
2914+
}
2915+
}
2916+
2917+
@Test
2918+
def testAlterShareGroupOffsets(): Unit = {
2919+
val config = createConfig
2920+
client = Admin.create(config)
2921+
val testTopicName = "test_topic"
2922+
val testGroupId = "test_group_id"
2923+
val testClientId = "test_client_id"
2924+
val fakeGroupId = "fake_group_id"
2925+
val fakeTopicName = "foo"
2926+
2927+
val tp1 = new TopicPartition(testTopicName, 0)
2928+
val tp2 = new TopicPartition(fakeTopicName, 0)
2929+
try {
2930+
prepareTopics(List(testTopicName), 1)
2931+
prepareRecords(testTopicName)
2932+
2933+
val newShareConsumerConfig = new Properties(consumerConfig)
2934+
newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
2935+
newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
2936+
2937+
Using.resource(createShareConsumer(configOverrides = newShareConsumerConfig)) { consumer =>
2938+
consumer.subscribe(util.List.of(testTopicName))
2939+
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
2940+
consumer.commitSync()
2941+
2942+
// listGroups is used to list share groups
2943+
// Test that we can list the new group.
2944+
TestUtils.waitUntilTrue(() => {
2945+
client.listGroups.all.get.stream().filter(group =>
2946+
group.groupId == testGroupId &&
2947+
group.groupState.get == GroupState.STABLE).count() == 1
2948+
}, s"Expected to be able to list $testGroupId")
2949+
2950+
// Test offset alter while consuming
2951+
val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, util.Map.of(tp1, 0, tp2, 0))
2952+
2953+
// Altering the offset with real group ID should get GroupNotEmptyException
2954+
assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.all())
2955+
assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp1))
2956+
assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp2))
2957+
2958+
// Test the fake group ID
2959+
val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId, util.Map.of(tp1, 0, tp2, 0))
2960+
2961+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.all())
2962+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp1))
2963+
assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp2))
2964+
}
2965+
2966+
// Test offset alter when group is empty
2967+
val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, util.Map.of(tp1, 0, tp2, 0))
2968+
2969+
assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.all())
2970+
assertNull(offsetAlterResult.partitionResult(tp1).get())
2971+
assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.partitionResult(tp2))
2972+
2973+
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
2974+
.partitionsToOffsetAndMetadata(testGroupId)
2975+
.get()
2976+
assertTrue(parts.containsKey(tp1))
2977+
assertEquals(0, parts.get(tp1).offset())
2978+
} finally {
2979+
Utils.closeQuietly(client, "adminClient")
2980+
}
2981+
}
2982+
2983+
@Test
2984+
def testListShareGroupOffsets(): Unit = {
2985+
val config = createConfig
2986+
client = Admin.create(config)
2987+
val testTopicName = "test_topic"
2988+
val testGroupId = "test_group_id"
2989+
val testClientId = "test_client_id"
2990+
2991+
val newShareConsumerConfig = new Properties(consumerConfig)
2992+
newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
2993+
newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
2994+
val consumerSet = Set(createShareConsumer(configOverrides = newShareConsumerConfig))
2995+
val topicSet = Set(testTopicName)
2996+
val latch = new CountDownLatch(consumerSet.size)
2997+
2998+
try {
2999+
assertNoShareGroupsExist()
3000+
prepareTopics(List(testTopicName), 2)
3001+
prepareRecords(testTopicName)
3002+
3003+
// Start consumers in a thread that will subscribe to a new group.
3004+
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createShareConsumerThread(zipped._1, zipped._2, latch))
3005+
try {
3006+
consumerThreads.foreach(_.start())
3007+
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
3008+
val tp1 = new TopicPartition(testTopicName, 0)
3009+
val tp2 = new TopicPartition(testTopicName, 1)
3010+
3011+
// Test listShareGroupOffsets
3012+
TestUtils.waitUntilTrue(() => {
3013+
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec()))
3014+
.partitionsToOffsetAndMetadata(testGroupId)
3015+
.get()
3016+
parts.containsKey(tp1) && parts.containsKey(tp2)
3017+
}, "Expected the result contains all partitions.")
3018+
3019+
// Test listShareGroupOffsets with listShareGroupOffsetsSpec
3020+
val groupSpecs = util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))
3021+
val parts = client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get()
3022+
assertTrue(parts.containsKey(tp1))
3023+
assertFalse(parts.containsKey(tp2))
3024+
} finally {
3025+
consumerThreads.foreach {
3026+
case consumerThread =>
3027+
consumerThread.interrupt()
3028+
consumerThread.join()
3029+
}
3030+
}
3031+
} finally {
3032+
consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
28503033
Utils.closeQuietly(client, "adminClient")
28513034
}
28523035
}

0 commit comments

Comments
 (0)