1919from confluent_kafka .admin import (AclBinding , AclBindingFilter , ResourceType ,
2020 ResourcePatternType , AclOperation , AclPermissionType )
2121from confluent_kafka .error import ConsumeError
22- from confluent_kafka import ConsumerGroupState , TopicCollection
22+ from confluent_kafka import ConsumerGroupState , TopicCollection , ConsumerGroupType
2323
2424from tests .common import TestUtils
2525
@@ -30,12 +30,16 @@ def verify_commit_result(err, _):
3030 assert err is not None
3131
3232
33- def consume_messages (sasl_cluster , group_id , topic , num_messages = None ):
33+ def consume_messages (sasl_cluster , group_id , group_protocol , topic , num_messages = None ):
3434 conf = {'group.id' : group_id ,
35- 'session.timeout.ms ' : 6000 ,
35+ 'group.protocol ' : group_protocol ,
3636 'enable.auto.commit' : False ,
3737 'on_commit' : verify_commit_result ,
3838 'auto.offset.reset' : 'earliest' }
39+
40+ if group_protocol == 'classic' :
41+ conf ['session.timeout.ms' ] = 6000
42+
3943 consumer = sasl_cluster .consumer (conf )
4044 consumer .subscribe ([topic ])
4145 read_messages = 0
@@ -164,7 +168,7 @@ def verify_describe_groups(cluster, admin_client, topic):
164168
165169 # Consume some messages for the group
166170 group = 'test-group'
167- consume_messages (cluster , group , topic , 2 )
171+ consume_messages (cluster , group , 'classic' , topic , 2 )
168172
169173 # Verify Describe Consumer Groups
170174 desc = verify_provided_describe_for_authorized_operations (admin_client ,
@@ -177,10 +181,29 @@ def verify_describe_groups(cluster, admin_client, topic):
177181 assert group == desc .group_id
178182 assert desc .is_simple_consumer_group is False
179183 assert desc .state == ConsumerGroupState .EMPTY
184+ assert desc .type == ConsumerGroupType .CLASSIC
180185
181186 # Delete group
182187 perform_admin_operation_sync (admin_client .delete_consumer_groups , [group ], request_timeout = 10 )
183188
189+ consumer_group = 'test-group-consumer'
190+
191+ consume_messages (cluster , consumer_group , 'consumer' , topic , 2 )
192+
193+ desc = verify_provided_describe_for_authorized_operations (admin_client ,
194+ admin_client .describe_consumer_groups ,
195+ AclOperation .READ ,
196+ AclOperation .DELETE ,
197+ ResourceType .GROUP ,
198+ consumer_group ,
199+ [consumer_group ])
200+ assert consumer_group == desc .group_id
201+ assert desc .is_simple_consumer_group is False
202+ assert desc .state == ConsumerGroupState .EMPTY
203+ assert desc .type == ConsumerGroupType .CONSUMER
204+
205+ # Delete group
206+ perform_admin_operation_sync (admin_client .delete_consumer_groups , [consumer_group ], request_timeout = 10 )
184207
185208def verify_describe_cluster (admin_client ):
186209 desc = verify_provided_describe_for_authorized_operations (admin_client ,
@@ -217,11 +240,7 @@ def test_describe_operations(sasl_cluster):
217240 verify_describe_topics (admin_client , our_topic )
218241
219242 # Verify Authorized Operations in Describe Groups
220- # Skip this test if using group protocol `consumer`
221- # as there is new RPC for describe_groups() in
222- # group protocol `consumer` case.
223- if not TestUtils .use_group_protocol_consumer ():
224- verify_describe_groups (sasl_cluster , admin_client , our_topic )
243+ verify_describe_groups (sasl_cluster , admin_client , our_topic )
225244
226245 # Delete Topic
227246 perform_admin_operation_sync (admin_client .delete_topics , [our_topic ], operation_timeout = 0 , request_timeout = 10 )
0 commit comments