1616# limitations under the License.
1717
1818import pytest
19+ from enum import Enum
1920from confluent_kafka import ConsumerGroupType , KafkaException
2021from tests .common import TestUtils
2122
@@ -35,7 +36,7 @@ def get_group_protocol_type(a, group_id):
3536 raise
3637
3738
38- def check_consumer (kafka_cluster , consumers , admin_client , topic , expected_protocol ):
39+ def check_consumer (kafka_cluster , consumers , admin_client , group_id , topic , expected_protocol ):
3940 no_of_messages = 100
4041 total_msg_read = 0
4142 expected_partitions_per_consumer = number_of_partitions // len (consumers )
@@ -50,7 +51,7 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
5051 assert len (assignment ) == expected_partitions_per_consumer
5152 assert len (all_assignments ) == number_of_partitions
5253
53- assert get_group_protocol_type (admin_client , topic ) == expected_protocol
54+ assert get_group_protocol_type (admin_client , group_id ) == expected_protocol
5455
5556 # Produce some messages to the topic
5657 test_data = ['test-data{}' .format (i ) for i in range (0 , no_of_messages )]
@@ -67,6 +68,11 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
6768 assert total_msg_read == no_of_messages , f"Expected to read { no_of_messages } messages, but read { total_msg_read } "
6869
6970
71+ class Operation (Enum ):
72+ ADD = 0
73+ REMOVE = 1
74+
75+
7076def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy (kafka_cluster , partition_assignment_strategy ):
7177 """
7278 Test consumer upgrade and downgrade.
@@ -90,22 +96,26 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k
9096 ** consumer_conf
9197 }
9298
93- consumer = kafka_cluster .consumer (consumer_conf_classic )
94- assert consumer is not None
95- consumer .subscribe ([topic ])
96- check_consumer (kafka_cluster , [consumer ], admin_client , topic , ConsumerGroupType .CLASSIC )
97-
98- # Now simulate an upgrade by creating a new consumer with 'consumer' protocol
99- consumer2 = kafka_cluster .consumer (consumer_conf_consumer )
100- assert consumer2 is not None
101- consumer2 .subscribe ([topic ])
102- check_consumer (kafka_cluster , [consumer , consumer2 ], admin_client , topic , ConsumerGroupType .CONSUMER )
103-
104- # Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer
105- consumer2 .close ()
106- check_consumer (kafka_cluster , [consumer ], admin_client , topic , ConsumerGroupType .CLASSIC )
107-
108- consumer .close ()
99+ test_scenarios = [(Operation .ADD , consumer_conf_classic , ConsumerGroupType .CLASSIC ),
100+ (Operation .ADD , consumer_conf_consumer , ConsumerGroupType .CONSUMER ),
101+ (Operation .REMOVE , None , ConsumerGroupType .CONSUMER ),
102+ (Operation .ADD , consumer_conf_classic , ConsumerGroupType .CONSUMER ),
103+ (Operation .REMOVE , None , ConsumerGroupType .CLASSIC )]
104+ consumers = []
105+
106+ for operation , conf , expected_protocol in test_scenarios :
107+ if operation == Operation .ADD :
108+ consumer = kafka_cluster .consumer (conf )
109+ assert consumer is not None
110+ consumer .subscribe ([topic ])
111+ consumers .append (consumer )
112+ elif operation == Operation .REMOVE :
113+ consumer_to_remove = consumers .pop (0 )
114+ consumer_to_remove .close ()
115+ check_consumer (kafka_cluster , consumers , admin_client , topic , topic , expected_protocol )
116+
117+ assert len (consumers ) == 1
118+ consumers [0 ].close ()
109119 kafka_cluster .delete_topic (topic )
110120
111121
0 commit comments