| 
16 | 16 | # limitations under the License.  | 
17 | 17 | 
 
  | 
18 | 18 | import pytest  | 
19 |  | -from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition  | 
20 |  | -from confluent_kafka.admin import OffsetSpec  | 
 | 19 | +from confluent_kafka import ConsumerGroupType, KafkaException  | 
21 | 20 | from tests.common import TestUtils  | 
22 | 21 | 
 
  | 
23 | 22 | topic_prefix = "test_consumer_upgrade_downgrade_"  | 
@@ -76,20 +75,25 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k  | 
76 | 75 |                                                                     })  | 
77 | 76 |     admin_client = kafka_cluster.admin()  | 
78 | 77 | 
 
  | 
79 |  | -    # Create a consumer with the latest version  | 
80 | 78 |     consumer_conf = {'group.id': topic,  | 
81 |  | -                     'auto.offset.reset': 'earliest',  | 
82 |  | -                     'group.protocol': 'classic'}  | 
83 |  | -    consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy  | 
84 |  | -    consumer = kafka_cluster.consumer(consumer_conf)  | 
 | 79 | +                     'auto.offset.reset': 'earliest'}  | 
 | 80 | +    consumer_conf_classic = {  | 
 | 81 | +        'group.protocol': 'classic',  | 
 | 82 | +        'partition.assignment.strategy': partition_assignment_strategy,  | 
 | 83 | +        **consumer_conf  | 
 | 84 | +    }  | 
 | 85 | +    consumer_conf_consumer = {  | 
 | 86 | +        'group.protocol': 'consumer',  | 
 | 87 | +        **consumer_conf  | 
 | 88 | +    }  | 
 | 89 | + | 
 | 90 | +    consumer = kafka_cluster.consumer(consumer_conf_classic)  | 
85 | 91 |     assert consumer is not None  | 
86 | 92 |     consumer.subscribe([topic])  | 
87 | 93 |     check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)  | 
88 |  | -    del consumer_conf['partition.assignment.strategy']  | 
89 | 94 | 
 
  | 
90 | 95 |     # Now simulate an upgrade by creating a new consumer with 'consumer' protocol  | 
91 |  | -    consumer_conf['group.protocol'] = 'consumer'  | 
92 |  | -    consumer2 = kafka_cluster.consumer(consumer_conf)  | 
 | 96 | +    consumer2 = kafka_cluster.consumer(consumer_conf_consumer)  | 
93 | 97 |     assert consumer2 is not None  | 
94 | 98 |     consumer2.subscribe([topic])  | 
95 | 99 |     check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER)  | 
 | 
0 commit comments