@@ -3190,6 +3190,42 @@ public void testListGroupsEmptyGroupType() throws Exception {
31903190 }
31913191 }
31923192
3193+ @Test
3194+ public void testListGroupsWithProtocolTypes() throws Exception {
3195+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
3196+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
3197+
3198+ // Test with list group options.
3199+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
3200+
3201+ env.kafkaClient().prepareResponseFrom(
3202+ expectListGroupsRequestWithFilters(Set.of(), Set.of()),
3203+ new ListGroupsResponse(new ListGroupsResponseData()
3204+ .setErrorCode(Errors.NONE.code())
3205+ .setGroups(List.of(
3206+ new ListGroupsResponseData.ListedGroup()
3207+ .setGroupId("group-1")
3208+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
3209+ .setGroupState("Stable")
3210+ .setGroupType(GroupType.CONSUMER.toString()),
3211+ new ListGroupsResponseData.ListedGroup()
3212+ .setGroupId("group-2")
3213+ .setGroupState("Empty")
3214+ .setGroupType(GroupType.CONSUMER.toString())))),
3215+ env.cluster().nodeById(0));
3216+
3217+ final ListGroupsOptions options = new ListGroupsOptions().withProtocolTypes(Set.of(""));
3218+ final ListGroupsResult result = env.adminClient().listGroups(options);
3219+ Collection<GroupListing> listing = result.valid().get();
3220+
3221+ assertEquals(1, listing.size());
3222+ List<GroupListing> expected = new ArrayList<>();
3223+ expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)));
3224+ assertEquals(expected, listing);
3225+ assertEquals(0, result.errors().get().size());
3226+ }
3227+ }
3228+
31933229 @Test
31943230 public void testListGroupsWithTypes() throws Exception {
31953231 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -3432,6 +3468,42 @@ public void testListConsumerGroupsWithStates() throws Exception {
34323468 }
34333469 }
34343470
3471+ @Test
3472+ public void testListConsumerGroupsWithProtocolTypes() throws Exception {
3473+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
3474+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
3475+
3476+ // Test with a specific protocol type filter in list consumer group options.
3477+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
3478+
3479+ env.kafkaClient().prepareResponseFrom(
3480+ expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())),
3481+ new ListGroupsResponse(new ListGroupsResponseData()
3482+ .setErrorCode(Errors.NONE.code())
3483+ .setGroups(List.of(
3484+ new ListGroupsResponseData.ListedGroup()
3485+ .setGroupId("group-1")
3486+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
3487+ .setGroupState("Stable")
3488+ .setGroupType(GroupType.CONSUMER.toString()),
3489+ new ListGroupsResponseData.ListedGroup()
3490+ .setGroupId("group-2")
3491+ .setGroupState("Empty")
3492+ .setGroupType(GroupType.CONSUMER.toString())))),
3493+ env.cluster().nodeById(0));
3494+
3495+ final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
3496+ final ListGroupsResult result = env.adminClient().listGroups(options);
3497+ Collection<GroupListing> listings = result.valid().get();
3498+
3499+ assertEquals(1, listings.size());
3500+ List<GroupListing> expected = new ArrayList<>();
3501+ expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
3502+ assertEquals(expected, listings);
3503+ assertEquals(0, result.errors().get().size());
3504+ }
3505+ }
3506+
34353507 @Test
34363508 public void testListConsumerGroupsWithTypes() throws Exception {
34373509 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
0 commit comments